2016-09-14 191 views
0

我创建了一个包含5个分区的Kafka主题。我正在使用如下所示的createStream接收器API。但不知何故,只有一个接收器获取输入数据。其余的接收器没有任何处理。你能帮忙吗?Spark Kafka Receiver不从所有分区中挑选数据

JavaPairDStream<String, String> messages = null; 

      if(sparkStreamCount > 0){ 
       // We create an input DStream for each partition of the topic, unify those streams, and then repartition the unified stream. 
       List<JavaPairDStream<String, String>> kafkaStreams = new ArrayList<JavaPairDStream<String, String>>(sparkStreamCount); 
       for (int i = 0; i < sparkStreamCount; i++) { 
           kafkaStreams.add(KafkaUtils.createStream(jssc, contextVal.getString(KAFKA_ZOOKEEPER), contextVal.getString(KAFKA_GROUP_ID), kafkaTopicMap)); 
       } 
       messages = jssc.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size())); 
      } 
      else{ 
       messages = KafkaUtils.createStream(jssc, contextVal.getString(KAFKA_ZOOKEEPER), contextVal.getString(KAFKA_GROUP_ID), kafkaTopicMap); 
      } 

Spark UI With MultipleReceiver

将更改后我收到下列情况除外:

INFO : org.apache.spark.streaming.kafka.KafkaReceiver - Connected to localhost:2181 
INFO : org.apache.spark.streaming.receiver.ReceiverSupervisorImpl - Stopping receiver with message: Error starting receiver 0: java.lang.AssertionError: assertion failed 
INFO : org.apache.spark.streaming.receiver.ReceiverSupervisorImpl - Called receiver onStop 
INFO : org.apache.spark.streaming.receiver.ReceiverSupervisorImpl - Deregistering receiver 0 
ERROR: org.apache.spark.streaming.scheduler.ReceiverTracker - Deregistered receiver for stream 0: Error starting receiver 0 - java.lang.AssertionError: assertion failed 
    at scala.Predef$.assert(Predef.scala:165) 
    at kafka.consumer.TopicCount$$anonfun$makeConsumerThreadIdsPerTopic$2.apply(TopicCount.scala:36) 
    at kafka.consumer.TopicCount$$anonfun$makeConsumerThreadIdsPerTopic$2.apply(TopicCount.scala:34) 
    at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) 
    at scala.collection.immutable.Map$Map1.foreach(Map.scala:109) 
    at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) 
    at kafka.consumer.TopicCount$class.makeConsumerThreadIdsPerTopic(TopicCount.scala:34) 
    at kafka.consumer.StaticTopicCount.makeConsumerThreadIdsPerTopic(TopicCount.scala:100) 
    at kafka.consumer.StaticTopicCount.getConsumerThreadIdsPerTopic(TopicCount.scala:104) 
    at kafka.consumer.ZookeeperConsumerConnector.consume(ZookeeperConsumerConnector.scala:198) 
    at kafka.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:138) 
    at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:111) 
    at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:148) 
    at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:130) 
    at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:542) 
    at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:532) 
    at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1986) 
    at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1986) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
    at org.apache.spark.scheduler.Task.run(Task.scala:88) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 

INFO : org.apache.spark.streaming.receiver.ReceiverSupervisorImpl - Stopped receiver 0 
INFO : org.apache.spark.streaming.receiver.BlockGenerator - Stopping BlockGenerator 
INFO : org.apache.spark.streaming.util.RecurringTimer - Stopped timer for BlockGenerator after time 1473964037200 
INFO : org.apache.spark.streaming.receiver.BlockGenerator - Waiting for block pushing thread to terminate 
INFO : org.apache.spark.streaming.receiver.BlockGenerator - Pushing out the last 0 blocks 
INFO : org.apache.spark.streaming.receiver.BlockGenerator - Stopped block pushing thread 
INFO : org.apache.spark.streaming.receiver.BlockGenerator - Stopped BlockGenerator 
INFO : org.apache.spark.streaming.receiver.ReceiverSupervisorImpl - Waiting for receiver to be stopped 
ERROR: org.apache.spark.streaming.receiver.ReceiverSupervisorImpl - Stopped receiver with error: java.lang.AssertionError: assertion failed 
ERROR: org.apache.spark.executor.Executor - Exception in task 0.0 in stage 29.0 

回答

0

有一个问题与上面的代码。在KafkaUtils.createStreamkafkaTopicMap参数指定Map of (topic_name -> numPartitions) to consume. Each partition is consumed in its own thread

试试下面的代码:

JavaPairDStream<String, String> messages = null; 
int sparkStreamCount = 5; 
Map<String, Integer> kafkaTopicMap = new HashMap<String, Integer>(); 
if (sparkStreamCount > 0) { 

    List<JavaPairDStream<String, String>> kafkaStreams = new ArrayList<JavaPairDStream<String, String>>(sparkStreamCount); 
    for (int i = 0; i < sparkStreamCount; i++) { 
     kafkaTopicMap.put(topic, i+1); 
     kafkaStreams.add(KafkaUtils.createStream(streamingContext, contextVal.getString(KAFKA_ZOOKEEPER), contextVal.getString(KAFKA_GROUP_ID), kafkaTopicMap)); 
    } 

    messages = streamingContext.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size())); 

} else { 
    messages = KafkaUtils.createStream(streamingContext, contextVal.getString(KAFKA_ZOOKEEPER), contextVal.getString(KAFKA_GROUP_ID), kafkaTopicMap); 
} 
+0

非常感谢Hokam对你的反应。我做了你的建议的变化,但现在它不处理任何事情:只要给以下消息: – Alchemist

+0

信息:org.apache.spark.streaming.receiver.BlockGenerator - 推块输入2-1473954491600 信息:org.apache.spark - .storage.MemoryStore调用curMem = 487780,MAXMEM = 556038881 INFO ensureFreeSpace(213):org.apache.spark.storage.MemoryStore - 块输入2-1473954496800存储在内存中的字节(估计大小213.0 B,自由529.8 MB) INFO:org.apache.spark.storage.BlockManagerInfo - 增加输入2-1473954496800在存储器在localhost:53678(尺寸:213.0 B,自由:530.2 MB) WARN:org.apache.spark.storage.BlockManager - Block input-2-1473954496800 replicated to only 0 peer(s) – Alchemist

+0

Map kafkaTopicMap = new HashMap (); kafkaTopicMap.put(contextVal.getString(KAFKA_HPD_TOPICS),sparkStreamCount);列表> kafkaStreams =新的ArrayList >(sparkStreamCount); \t对(INT I = 0;我 Alchemist

相关问题