2017-02-18 98 views
12

我想一个DataFrame保存到HDFS使用DataFrameWriter木地板格式,三个值分割,就像这样:如何在Spark中分区和写入DataFrame而不删除没有新数据的分区?

dataFrame.write.mode(SaveMode.Overwrite).partitionBy("eventdate", "hour", "processtime").parquet(path) 

正如this question提到的,partitionBy将在path删除分区的全部现有层次结构和用dataFrame中的分区替换它们。由于特定日期的新增量数据将周期性地发布,我想要的是只替换dataFrame有数据的层次结构中的那些分区,而其他分区不变。

要做到这一点看来我需要保存每个分区单独使用它的完整路径,像这样:

singlePartition.write.mode(SaveMode.Overwrite).parquet(path + "/eventdate=2017-01-01/hour=0/processtime=1234567890") 

但是我无法理解来组织数据为单分区的最好办法DataFrame s,这样我就可以用他们的完整路径写出来。一个想法是这样的:

dataFrame.repartition("eventdate", "hour", "processtime").foreachPartition ... 

foreachPartitionIterator[Row]这是不理想的写出来,以平面形式运行。

我还考虑使用select...distinct eventdate, hour, processtime来获取分区列表,然后通过每个分区过滤原始数据帧并将结果保存到其完整分区路径。但是,对于每个分区而言,独特的查询加上一个过滤器似乎并不是非常有效,因为它会进行大量的过滤/写入操作。

我希望有一个更清洁的方法来保存dataFrame没有数据的现有分区?

感谢您的阅读。

Spark版本:2.1

回答

0

您可以尝试模式追加。

dataFrame.write.format("parquet") 
.mode("append") 
.partitionBy("year","month") 
.option("path",s"$path/table_name") 
.saveAsTable(s"stg_table_name") 
1

模式选项Append有一个赶上!

df.write.partitionBy("y","m","d") 
.mode(SaveMode.Append) 
.parquet("/data/hive/warehouse/mydbname.db/" + tableName) 

我测试过,看到这会保留现有的分区文件。然而,这次的问题如下:如果你运行相同的代码两次(使用相同的数据),那么它将创建新的parquet文件,而不是用相同的数据替换现有的文件(Spark 1.6)。因此,我们仍然可以用Overwrite来解决这个问题,而不是使用Append。我们应该在分区级覆盖,而不是在表级重写。

df.write.mode(SaveMode.Overwrite) 
.parquet("/data/hive/warehouse/mydbname.db/" + tableName + "/y=" + year + "/m=" + month + "/d=" + day) 

请参阅以下链接了解更多信息:

Overwrite specific partitions in spark dataframe write method

(我suriyanto的评论后更新了我的答复日Thnx。)

+0

你测试,如果当你写的一样数据两次取代旧分区?从我的测试中,它实际上在分区目录内创建了一个新的parquet文件,导致数据翻倍。我在Spark 2.2上。 – suriyanto