2017-09-26 58 views
2

我已经使用完整模式在流式数据帧上应用聚合。为了在本地保存数据帧,我实现了foreach接收器。我能够以文本形式保存数据帧。但我需要将它保存为镶木地板格式。如何将完整输出模式下的流聚合保存为实木复合地板?

val writerForText = new ForeachWriter[Row] { 
    var fileWriter: FileWriter = _ 

    override def process(value: Row): Unit = { 
     fileWriter.append(value.toSeq.mkString(",")) 
    } 

    override def close(errorOrNull: Throwable): Unit = { 
     fileWriter.close() 
    } 

    override def open(partitionId: Long, version: Long): Boolean = { 
     FileUtils.forceMkdir(new File(s"src/test/resources/${partitionId}")) 
     fileWriter = new FileWriter(new File(s"src/test/resources/${partitionId}/temp")) 
     true 

    } 
    } 

val columnName = "col1" 
frame.select(count(columnName),count(columnName),min(columnName),mean(columnName),max(columnName),first(columnName), last(columnName), sum(columnName)) 
       .writeStream.outputMode(OutputMode.Complete()).foreach(writerForText).start() 

我该如何做到这一点? 在此先感谢!

+0

你找到了一种方法来保存完整/更新模式,以实木复合地板? –

+0

是的,我写了使用https://github.com/chtefi/parquet-custom-reader-writer –

回答

-1

为了在本地保存数据帧,我实现了foreach接收器。我能够以文本形式保存数据帧。但我需要将它保存为镶木地板格式。

保存流式数据集时的默认格式是... 实木复合地板。这就是说,你不必使用相当先进的接收器,而只需要使用parquet

查询可能如下:

scala> :type in 
org.apache.spark.sql.DataFrame 

scala> in.isStreaming 
res0: Boolean = true 

in.writeStream. 
    option("checkpointLocation", "/tmp/checkpoint-so"). 
    start("/tmp/parquets") 
+0

写自定义作家结构化的流不允许我们写数据框到任何接收器在完整的模式,除了内存。为了保存它,我们必须实现foreach接收器。我们无法以您建议的完整模式进行此操作。 –

+0

哎哟......你可能是对的......太急于回答这个问题了。让我想一想...... –

+0

@MaheshChandKandpal由于你的接收器是一个文件,所以在Jacek的答案中使用** File Sink **和追加模式是有意义的。 您与Foreach Sink的解决方案似乎被推翻 –

相关问题