2016-06-11 62 views
3

我试图使用Spark将文本文件导出到Postgres数据库。我正在使用下面的一段代码来导出单个文本文件。我在同一个文件夹中有近200个文本文件,每个文本文件具有相同的结构。不幸的是,一年的价值不是我的输入文件的一部分,因此我很难编码它。使用Spark将文本文件导出到PostgreSQL - 自动化

我希望一次上传所有这些文件,但不知道该怎么做,有人有什么建议吗?

from pyspark.sql import SQLContext, Row 
sqlContext = SQLContext(sc) 

lines = sc.textFile("/aaaa/bbbb/DataFile/t-1870.txt") 
splits = lines.map(lambda l: l.split(",")) 
raw_data = splits.map(lambda b: Row(name=b[0], gender=b[1],count=int(b[2]),year=int(1870))) 

schemaBabies = sqlContext.createDataFrame(raw_data) 
schemaBabies.registerTempTable("raw_data") 

df = sqlContext.sql("select * from raw_data") 

pgurl="jdbc:postgresql://localhost:5432/sparkling?user=XXXX&password=XXXX" 
properties={"user":"XXXX","password":"XXXX","driver":"org.postgresql.Driver","mode":"append"} 

df.write.jdbc(url = pgurl ,table = "EDW.raw_data",properties=properties) 

回答

2

让我们假设你的数据是这样的:

import csv 
import tempfile 
import os 

out = tempfile.mkdtemp() 
data = [ 
    ("1870", [("Jane Doe", "F", 3)]), 
    ("1890", [("John Doe", "M", 1)]), 
] 

for year, rows in data: 
    with open(os.path.join(out, "t-{0}.txt".format(year)), "w") as fw: 
     csv.writer(fw).writerows(rows) 

开始PySpark会议或提交脚本传递正确spark-csv--packages参数,负载数据与指定模式:

from pyspark.sql.types import * 

schema = StructType([ 
    StructField("name", StringType(), True), 
    StructField("gender", StringType(), True), 
    StructField("count", LongType(), True) 
]) 

df = (sqlContext.read.format("com.databricks.spark.csv") 
    .schema(schema) 
    .load(out)) 

提取物年从文件名中写入:

from pyspark.sql.functions import input_file_name, regexp_extract 

df_with_year = (df.withColumn(
    "year", 
    regexp_extract(input_file_name(), "[1-2][0-9]{3}", 0).cast("int"))) 

df_with_year.show() 
## +--------+------+-----+----+ 
## | name|gender|count|year| 
## +--------+------+-----+----+ 
## |John Doe|  M| 1|1890| 
## |Jane Doe|  F| 3|1870| 
## +--------+------+-----+----+ 

df_with_year.write.jdbc(...) 

重要:在Spark < 2.0中,此方法依赖于不在Python和JVM之间传递数据。它将无法​​与Python UDF或DataFrame.rdd.map工作

+1

我确实根据您的输入对我的代码进行了一些更改,我可以将所有200多个文本文件加载到数据库中。真的很感激你的帮助。 – ytasfeb15

相关问题