0
在我的Spark应用程序中。我从两个Kafka主题创建两个DStream。我这样做,因为我需要以不同的方式处理两个DStream。以下是代码示例:在Spark Streaming中创建两个来自Kafka的DStream主题不起作用
object KafkaConsumerTest3 {
var sc:SparkContext = null
def main(args: Array[String]) {
Logger.getLogger("org").setLevel(Level.OFF);
Logger.getLogger("akka").setLevel(Level.OFF);
val Array(zkQuorum, group, topics1, topics2, numThreads) = Array("localhost:2181", "group3", "test_topic4", "test_topic5","5")
val sparkConf = new SparkConf().setAppName("SparkConsumer").setMaster("local[2]")
sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(2))
val topicMap1 = topics1.split(",").map((_, numThreads.toInt)).toMap
val topicMap2 = topics2.split(",").map((_, numThreads.toInt)).toMap
val lines2 = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap2).map(_._2)
val lines1 = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap1).map(_._2)
lines2.foreachRDD{rdd =>
rdd.foreach { println }}
lines1.foreachRDD{rdd =>
rdd.foreach { println }}
ssc.start()
ssc.awaitTermination()
}
}
这两个主题都可能有也可能没有数据。在我的情况下,第一个主题目前没有获取数据,但第二个主题正在获得。但我的火花应用程序不打印任何数据。也没有例外。 有什么我失踪?或者我如何解决这个问题。
它是否适用于单流? – Natalia
是的..它可以与一个流一起使用。 – Alok
@Alok:你可以尝试打印在foreachRDD方法里面的rdd.count? – Shankar