1

我没有从使用Kafka直接流的队列中获取任何数据。在我的代码中,我把System.out.println()这个语句不运行,这意味着我没有从该主题获取任何数据..使用Java Spark从卡夫卡主题获取的值 - kafka direct stream

我很确定队列中的数据可用,因为没有获取控制台。

我没有在控制台中看到任何错误。

任何人都可以请建议一些东西吗?

这里是我的Java代码,

SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount11").setMaster("local[*]"); 
     sparkConf.set("spark.streaming.concurrentJobs", "3"); 

     // Create the context with 2 seconds batch size 
     JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(3000)); 

     Map<String, Object> kafkaParams = new HashMap<>(); 
     kafkaParams.put("bootstrap.servers", "x.xx.xxx.xxx:9092"); 
     kafkaParams.put("key.deserializer", StringDeserializer.class); 
     kafkaParams.put("value.deserializer", StringDeserializer.class); 
     kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream"); 
     kafkaParams.put("auto.offset.reset", "latest"); 
     kafkaParams.put("enable.auto.commit", true); 

     Collection<String> topics = Arrays.asList("topicName"); 

     final JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(jssc, 
       LocationStrategies.PreferConsistent(), 
       ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)); 


     JavaPairDStream<String, String> lines = stream 
       .mapToPair(new PairFunction<ConsumerRecord<String, String>, String, String>() { 
        @Override 
        public Tuple2<String, String> call(ConsumerRecord<String, String> record) { 

         return new Tuple2<>(record.key(), record.value()); 
        } 
       }); 

     lines.print(); 

     // System.out.println(lines.count()); 
     lines.foreachRDD(rdd -> { 
      rdd.values().foreachPartition(p -> { 
       while (p.hasNext()) { 
        System.out.println("Value of Kafka queue" + p.next()); 
       } 
      }); 
     }); 
+0

两个思路进行检查:1)做新的数据流进你的话题?默认情况下,您只会收到比您的工作更新的数据。否则,将auto.offset.reset设置为“最早的”2)bootstrap.servers需要与kafka发布的值完全匹配(请参阅kafka broker配置)。如果经纪人公布其DNS名称,并尝试通过IP地址进行连接,则您将收到不是数据,但无错误 –

+0

您是否在您的POM中添加了spark-streaming-kafka jar? – user4342532

+0

我以前在这个集成工作。如果你想任何帮助只是分享你的电子邮件 – user4342532

回答

0

@Vimal这里是一个link在斯卡拉创造直接流的工作版本。

我相信在Scala中查看它之后,你必须很容易地转换它。

请确保您关闭以阅读卡夫卡的最新主题。它可能不会选择上次处理的任何主题。

+0

我评论到“kafkaParams.put(”auto.offset.reset“,”latest“ );”仍然无法得到结果 –

+0

Scala代码和Java代码是完全不同的..我没有得到任何解决方案从给定的Scala –

1

我能够打印的使用直接卡夫卡流从卡夫卡队列取串..

这里是我的代码,

import java.util.HashMap; 
import java.util.HashSet; 
import java.io.IOException; 
import java.nio.file.Files; 
import java.nio.file.Paths; 
import java.nio.file.StandardOpenOption; 
import java.util.Arrays; 
import java.util.Calendar; 
import java.util.Collection; 
import java.util.Currency; 
import java.util.Iterator; 
import java.util.List; 
import java.util.Map; 
import java.util.Set; 
import java.util.concurrent.atomic.AtomicReference; 
import java.util.regex.Pattern; 

import scala.Tuple2; 

import kafka.serializer.StringDecoder; 

import org.apache.spark.SparkConf; 
import org.apache.spark.api.java.JavaPairRDD; 
import org.apache.spark.api.java.JavaRDD; 
import org.apache.spark.api.java.function.*; 
import org.apache.spark.streaming.api.java.*; 
import org.apache.spark.streaming.kafka.HasOffsetRanges; 
import org.apache.spark.streaming.kafka.KafkaUtils; 
import org.apache.spark.streaming.kafka.OffsetRange; 
import org.json.JSONObject; 
import org.omg.CORBA.Current; 
import org.apache.spark.streaming.Duration; 
import org.apache.spark.streaming.Durations; 

public final class KafkaConsumerDirectStream { 

    public static void main(String[] args) throws Exception { 

     try { 
      SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount11").setMaster("local[*]"); 
      sparkConf.set("spark.streaming.concurrentJobs", "30"); 

      // Create the context with 2 seconds batch size 
      JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(200)); 

      Map<String, String> kafkaParams = new HashMap<>(); 
      kafkaParams.put("metadata.broker.list", "x.xx.xxx.xxx:9091"); 

      Set<String> topics = new HashSet(); 
      topics.add("PartWithTopic02Queue"); 

      JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(jssc, String.class, 
        String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics); 

      JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() { 
       @Override 
       public String call(Tuple2<String, String> tuple2) { 
        return tuple2._2(); 
       } 
      }); 

      lines.foreachRDD(rdd -> { 

       if (rdd.count() > 0) { 
        List<String> strArray = rdd.collect(); 

        // Print string here 
       } 
      }); 

      jssc.start(); 
      jssc.awaitTermination(); 
     } 
    } 
    catch (Exception e) { 
      e.printStackTrace(); 
     } 
}