2016-11-27 108 views
0

2小时后,spark工作正在运行,将一些tgz文件转换为实木复合地板。 作业新数据追加到现有的实木复合地板在S3:如何避免在追加新数据时从S3中读取旧文件?

df.write.mode("append").partitionBy("id","day").parquet("s3://myBucket/foo.parquet") 

在火花提交输出,我可以看到显著的时间花费在阅读旧地板的文件,例如:

16/11/27 14:06:15信息S3NativeFileSystem:打开's3://myBucket/foo.parquet/id=123/day=2016-11-26/part-r-00003-b20752e9-5d70-43f5-b8b4- 50b5b4d0c7da.snappy.parquet'用于阅读

16/11/27 14:06:15信息S3NativeFileSystem:码流 'foo.parquet/id = 123 /天= 2016年11月26日/部分-R-00003-e80419de-7019-4859-bbe7-dcd392f6fcd3.snappy.parquet” 寻求位置 '149195444'

它看起来像这样操作花费小于1第二个文件,但文件数量随时间增加(每个附加文件增加新文件),这使我认为我的代码将无法缩放。

任何想法如何避免从s3读旧的地板文件,如果我只需要添加新的数据?

我使用EMR 4.8.2和DirectParquetOutputCommitter:

sc._jsc.hadoopConfiguration().set('spark.sql.parquet.output.committer.class', 'org.apache.spark.sql.parquet.DirectParquetOutputCommitter') 

回答

1

我通过写数据框以EMR HDFS,然后用S3-DIST-CP上传到检察院S3

解决这个问题
相关问题