0
我通过以下代码成功地将文件从S3加载到Spark中。它正在工作,但是我注意到1个文件和另一个文件之间存在延迟,并且它们按顺序加载。我想通过并行加载来改善这一点。并行加载S3文件Spark
// Load files that were loaded into firehose on this day
var s3Files = spark.sqlContext.read.schema(schema).json("s3n://" + job.awsaccessKey + ":" + job.awssecretKey + "@" + job.bucketName + "/" + job.awss3RawFileExpression + "/" + year + "/" + monthCheck + "/" + dayCheck + "/*/").rdd
// Apply the schema to the RDD, here we will have duplicates
val usersDataFrame = spark.createDataFrame(s3Files, schema)
usersDataFrame.createOrReplaceTempView("results")
// Clean and use partition by the keys to eliminate duplicates and get latest record
var results = spark.sql(buildCleaningQuery(job, "results"))
results.createOrReplaceTempView("filteredResults")
val records = spark.sql("select count(*) from filteredResults")
我也试图通过文本文件()方法加载,但然后我有问题转化RDD [字符串]以RDD [行],因为后来我就需要移动使用SQL星火。我以如下方式使用它;
var s3Files = sparkContext.textFile("s3n://" + job.awsaccessKey + ":" + job.awssecretKey + "@" + job.bucketName + "/" + job.awss3RawFileExpression + "/" + year + "/" + monthCheck + "/" + dayCheck + "/*/").toJavaRDD()
什么是加载JSON文件(每个大约50MB的多个文件)到Spark的理想方式?我想根据模式验证属性,所以稍后我将能够使用Spark SQL查询来清理数据。
是否有必要将s3Files更改为rdd?如果你不把它改成rdd,我相信它会并行地提取文件内容。 – wllmtrng
最终结果将是一个数据框,然后在其上运行Spark SQL查询并保存到红移。在不转换为RDD的情况下,我将无法遵循该逻辑,除非我错过了某些东西。 – Mez
var s3Files = spark.sqlContext.read.schema(schema).json(...)。createOrReplaceTempView(“results”) 应该足够。尝试一下,看看它是否仍然顺序读取 – wllmtrng