我没有从使用Kafka直接流的队列中获取任何数据。在我的代码中,我把System.out.println()这个语句不运行,这意味着我没有从该主题获取任何数据..使用Java Spark从卡夫卡主题获取的值 - kafka direct stream
我很确定队列中的数据可用,因为没有获取控制台。
我没有在控制台中看到任何错误。
任何人都可以请建议一些东西吗?
这里是我的Java代码,
SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount11").setMaster("local[*]");
sparkConf.set("spark.streaming.concurrentJobs", "3");
// Create the context with 2 seconds batch size
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(3000));
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "x.xx.xxx.xxx:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", true);
Collection<String> topics = Arrays.asList("topicName");
final JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));
JavaPairDStream<String, String> lines = stream
.mapToPair(new PairFunction<ConsumerRecord<String, String>, String, String>() {
@Override
public Tuple2<String, String> call(ConsumerRecord<String, String> record) {
return new Tuple2<>(record.key(), record.value());
}
});
lines.print();
// System.out.println(lines.count());
lines.foreachRDD(rdd -> {
rdd.values().foreachPartition(p -> {
while (p.hasNext()) {
System.out.println("Value of Kafka queue" + p.next());
}
});
});
两个思路进行检查:1)做新的数据流进你的话题?默认情况下,您只会收到比您的工作更新的数据。否则,将auto.offset.reset设置为“最早的”2)bootstrap.servers需要与kafka发布的值完全匹配(请参阅kafka broker配置)。如果经纪人公布其DNS名称,并尝试通过IP地址进行连接,则您将收到不是数据,但无错误 –
您是否在您的POM中添加了spark-streaming-kafka jar? – user4342532
我以前在这个集成工作。如果你想任何帮助只是分享你的电子邮件 – user4342532