0

如何保存卡夫卡星火消息流数据帧到单个文件如何保存卡夫卡星火消息流数据帧到单个文件

我已经制定,这将消耗使用Kafka-星火流过程中的信息的应用程序。

一旦收到数据,它就会转换成数据帧。

然后流式数据帧被保存为文本文件,这里数据帧被保存到每个文件中,用于每个kafka流消息,下面是我用于将数据保存为文本文件的代码,这是保存数据为每条消息添加多个文本文件。

DF.coalesce(1).write.format("com.databricks.spark.csv").mode("append") 
           .save("path") 

在这里,我想实现的是流日期框架的要求需要保存为每个卡夫卡消息的单个文件,如果可能的话,请帮我解决。

在此先感谢

回答

0

下面的代码可能会对您有所帮助。只需生成RDD列表,然后将其合并即可。

var dStreamRDDList = new ListBuffer[RDD[String]] 
dStream.foreachRDD(rdd => 
    { 
     dStreamRDDList += rdd 
    }) 
val joinRDD = ssc.sparkContext.union(dStreamRDDList) 
//then convert joinRDD to DataFrame (DF) 
DF.coalesce(1).write.format("com.databricks.spark.csv").mode("append") 
          .save("path") 
相关问题