我基本上想要使用来自Kafka的数据并将其写入HDFS。但发生的情况是,它不是在hdfs中编写任何文件。它会创建空文件。来自Kafka的Spark流式传输和Avro格式的HDFS写入
而且请指导我,如果我想写在HDF格式的HDFS我如何修改代码。
为了简单起见,我写了本地C盘。
import org.apache.spark.SparkConf
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import
org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.kafka.common.serialization.StringDeserializer
object KafkaStreaming extends App{
val conf = new org.apache.spark.SparkConf().setMaster("local[*]").setAppName("kafka-streaming")
val conext = new SparkContext(conf)
val ssc = new StreamingContext(conext, org.apache.spark.streaming.Milliseconds(1))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "group",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (true: java.lang.Boolean))
val topics = Array("topic")
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams))
val lines = stream.map(_.value)
stream.foreachRDD(rdd => {
rdd.coalesce(1).saveAsTextFile("C:/data/spark/")
})
ssc.start()
ssc.awaitTermination()}
而且下面是build.sbt
name := "spark-streaming"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "2.2.0"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.2.0"
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-
10_2.11" % "2.2.0"
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.11.0.1"
它所需要的是一个过滤器,以检查是否该批次是空.. stream.map(_。值).foreachRDD(RDD => { rdd.foreach(的println ) 如果){ rdd.saveAsTextFile( “C:/数据/火花/”) }(rdd.isEmpty(!) }) 但我仍然面对的问题是,新一批覆盖旧数据。我希望所有的数据被追加到文件中.. –