2017-06-14 66 views
1

也许有人在某个项目中使用了这个:我从Spark中写给Cassandra,在Spark中我使用kafkaUtils.createDirectStream。通过Spark-Cassandra连接器,我们可以使用Dstream.saveToCassandra方法。但保存/追加到HDFS我用:从kafka到hdfs通过火花

stream.map(_.value).foreachRDD(rdd => { 
    val conf = new Configuration() 
    conf.set("fs.defaultFS", "hdfs://127.0.0.1:9000/") 
    val fs = FileSystem.get(conf) 
    fs.append(new Path("textfile.txt")) 
    .write(rdd.collect().mkString.getBytes()) 
    fs.close() 
}) 

但我不认为这是做到这一点的最好办法。这可能是更好的使用类似:

val prepStr = { 
    val str = new StringBuilder 
    if (!rdd.isEmpty()) { 
    str.append(rdd.collect().mkString) 
    } 
    str 
} 

最后:

fs.append(path).write(prepStr.mkString.getBytes()) 

或许有人用另一种方式?

回答

1

假设你的流型的DStream[String]你可以使用由数据框作家提供的附加功能:

dstream.foreachRDD{rdd => 
    import sparkSession.implicits._ 
    val df = rdd.toDF() 
    df.write.mode("append").text("/hdfs/path/to/file") 
} 
+0

谢谢,我会尝试这样的方式来测试这个 –

+0

不,它写的文件在路径中,但不附加到现有文件 –

+0

它追加到您可以读回的逻辑文件。但的确,它将由许多内部分区组成。 – maasg