2017-07-28 90 views
0

我通过以下代码成功地将文件从S3加载到Spark中。它正在工作,但是我注意到1个文件和另一个文件之间存在延迟,并且它们按顺序加载。我想通过并行加载来改善这一点。并行加载S3文件Spark

 // Load files that were loaded into firehose on this day 
    var s3Files = spark.sqlContext.read.schema(schema).json("s3n://" + job.awsaccessKey + ":" + job.awssecretKey + "@" + job.bucketName + "/" + job.awss3RawFileExpression + "/" + year + "/" + monthCheck + "/" + dayCheck + "/*/").rdd 

    // Apply the schema to the RDD, here we will have duplicates 
    val usersDataFrame = spark.createDataFrame(s3Files, schema) 

    usersDataFrame.createOrReplaceTempView("results") 

    // Clean and use partition by the keys to eliminate duplicates and get latest record 
    var results = spark.sql(buildCleaningQuery(job, "results")) 
    results.createOrReplaceTempView("filteredResults") 
    val records = spark.sql("select count(*) from filteredResults") 

我也试图通过文本文件()方法加载,但然后我有问题转化RDD [字符串]以RDD [行],因为后来我就需要移动使用SQL星火。我以如下方式使用它;

 var s3Files = sparkContext.textFile("s3n://" + job.awsaccessKey + ":" + job.awssecretKey + "@" + job.bucketName + "/" + job.awss3RawFileExpression + "/" + year + "/" + monthCheck + "/" + dayCheck + "/*/").toJavaRDD() 

什么是加载JSON文件(每个大约50MB的多个文件)到Spark的理想方式?我想根据模式验证属性,所以稍后我将能够使用Spark SQL查询来清理数据。

+2

是否有必要将s3Files更改为rdd?如果你不把它改成rdd,我相信它会并行地提取文件内容。 – wllmtrng

+0

最终结果将是一个数据框,然后在其上运行Spark SQL查询并保存到红移。在不转换为RDD的情况下,我将无法遵循该逻辑,除非我错过了某些东西。 – Mez

+1

var s3Files = spark.sqlContext.read.schema(schema).json(...)。createOrReplaceTempView(“results”) 应该足够。尝试一下,看看它是否仍然顺序读取 – wllmtrng

回答

2

发生了什么是DataFrame被转换成RDD然后再转换成DataFrame,然后丢失分区信息。

var s3Files = spark 
    .sqlContext 
    .read.schema(schema) 
    .json(...) 
    .createOrRepla‌​ceTempView("results"‌​) 

应该足够了,分区信息应该仍然存在,允许json文件同时加载。