1
也许有人在某个项目中使用了这个:我从Spark中写给Cassandra,在Spark中我使用kafkaUtils.createDirectStream
。通过Spark-Cassandra连接器,我们可以使用Dstream.saveToCassandra
方法。但保存/追加到HDFS我用:从kafka到hdfs通过火花
stream.map(_.value).foreachRDD(rdd => {
val conf = new Configuration()
conf.set("fs.defaultFS", "hdfs://127.0.0.1:9000/")
val fs = FileSystem.get(conf)
fs.append(new Path("textfile.txt"))
.write(rdd.collect().mkString.getBytes())
fs.close()
})
但我不认为这是做到这一点的最好办法。这可能是更好的使用类似:
val prepStr = {
val str = new StringBuilder
if (!rdd.isEmpty()) {
str.append(rdd.collect().mkString)
}
str
}
最后:
fs.append(path).write(prepStr.mkString.getBytes())
或许有人用另一种方式?
谢谢,我会尝试这样的方式来测试这个 –
不,它写的文件在路径中,但不附加到现有文件 –
它追加到您可以读回的逻辑文件。但的确,它将由许多内部分区组成。 – maasg