2017-07-28 37 views
0

随着像 df.write.csv("s3a://mybucket/mytable")df.write.csv("s3a://mybucket/mytable") 我很明白知道哪里写文件/对象,但由于S3的最终一致性保证,我不能100%确定从该位置获取列表将返回所有(甚至是任何)刚刚写入的文件。如果我能得到刚才写的文件/对象列表,那么我可以为Redshift COPY命令准备一个清单文件,而不用担心最终的一致性。这是可能的 - 如果是这样的话?是否有可能在DataFrame写入时检索文件列表,或者是否有火花将其存储在某个地方?

回答

1

spark-redshift库可以为您处理此问题。如果你想自己做,你可以看看他们是如何在这里做到这一点:https://github.com/databricks/spark-redshift/blob/1092c7cd03bb751ba4e93b92cd7e04cffff10eb0/src/main/scala/com/databricks/spark/redshift/RedshiftWriter.scala#L299

编辑:我避免对一致性的更多担心通过df.coalesce(fileCount)输出已知数量的文件部分(红移你想多您的群集中的切片)。然后,您可以检查Spark代码中列出了多少个文件,以及Redshift stl_load_commits中加载了多少个文件。

+0

这仍在调用FileSystem.listStatus(),因此易受S3列表不一致性的影响。 –

+0

添加关于使用'coalesce()'输出已知文件数的注意事项 –

+1

担心spark-redshift库选择忽略问题,但需要一些有趣的解决方法。感觉Spark应该真的有一个功能,可以通过驱动程序写出一个清单 - 一个文件,而不是工作人员的目录;那将解决这个问题。 – SourceSimian

0

很好意识到一致性风险;您可以通过延迟创建可见性和已找到的已删除对象在列表中找到它。 AFAIK,无法获取创建的文件列表,因为它的任务可以在任务中生成他们想要的任何内容到任务输出目录,然后将其编组(通过列表和复制)到最终输出目录中,

在S3上面没有一致性层(S3mper,s3guard等)的情况下,您可以读&旋转“稍微”一点,以便让分片追上。我对“一点点”的好价值不知道。

但是,如果您打电话给fs.write.csv(),可能是因为在用于将任务输出传播到作业目录的提交程序中列出不一致而被捕获;这是通过列表+复制在S3A中完成的,请参阅。

+0

为什么选票?我错了什么?如果我告诉细节,我会纠正它。 –

+0

我不知道为什么你得到了downvote,你提供了很多关于我不知道存在的一致性层的很好的信息(我只知道EMRFS)。尽管你最后一句话看起来像是在句子中间结束了。 – SourceSimian

相关问题