2017-03-31 51 views
0

我正在使用storm-kafka-client 1.0.3的风暴1.0.1和Kafka 0.10.0.0。KafkaSpout元组重播抛出空指针异常

请在下面找到代码配置。

kafkaConsumerProps.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); 
      kafkaConsumerProps.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); 


      KafkaSpoutStreams kafkaSpoutStreams = new KafkaSpoutStreamsNamedTopics.Builder(new Fields(fieldNames), topics) 
        .build(); 

      KafkaSpoutRetryService retryService = new KafkaSpoutRetryExponentialBackoff(TimeInterval.microSeconds(500), 
           TimeInterval.milliSeconds(2), Integer.MAX_VALUE, TimeInterval.seconds(10)); 


      KafkaSpoutTuplesBuilder tuplesBuilder = new KafkaSpoutTuplesBuilderNamedTopics.Builder(new TestTupleBuilder(topics)) 
         .build(); 

      KafkaSpoutConfig kafkaSpoutConfig = new KafkaSpoutConfig.Builder<String, String>(kafkaConsumerProps, kafkaSpoutStreams, tuplesBuilder, retryService) 
                   .setOffsetCommitPeriodMs(10_000) 
                   .setFirstPollOffsetStrategy(LATEST) 
                   .setMaxRetries(5) 
                   .setMaxUncommittedOffsets(250) 
                   .build(); 

当我失败的元组没有得到重播。 Spout抛出错误。 请让我知道它为什么抛出空指针异常。

53501 [Thread-359-test-spout-executor[295 295]] ERROR o.a.s.util - Async loop died! 
java.lang.NullPointerException 
    at org.apache.storm.kafka.spout.KafkaSpout.doSeekRetriableTopicPartitions(KafkaSpout.java:260) ~[storm-kafka-client-1.0.3.jar:1.0.3] 
    at org.apache.storm.kafka.spout.KafkaSpout.pollKafkaBroker(KafkaSpout.java:248) ~[storm-kafka-client-1.0.3.jar:1.0.3] 
    at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:203) ~[storm-kafka-client-1.0.3.jar:1.0.3] 
    at org.apache.storm.daemon.executor$fn__7885$fn__7900$fn__7931.invoke(executor.clj:645) ~[storm-core-1.0.1.jar:1.0.1] 
    at org.apache.storm.util$async_loop$fn__625.invoke(util.clj:484) [storm-core-1.0.1.jar:1.0.1] 
    at clojure.lang.AFn.run(AFn.java:22) [clojure-1.8.0.jar:?] 
    at java.lang.Thread.run(Thread.java:745) [?:1.8.0_102] 
53501 [Thread-359-test-spout-executor[295 295]] ERROR o.a.s.d.executor - 
java.lang.NullPointerException 
    at org.apache.storm.kafka.spout.KafkaSpout.doSeekRetriableTopicPartitions(KafkaSpout.java:260) ~[storm-kafka-client-1.0.3.jar:1.0.3] 
    at org.apache.storm.kafka.spout.KafkaSpout.pollKafkaBroker(KafkaSpout.java:248) ~[storm-kafka-client-1.0.3.jar:1.0.3] 
    at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:203) ~[storm-kafka-client-1.0.3.jar:1.0.3] 
    at org.apache.storm.daemon.executor$fn__7885$fn__7900$fn__7931.invoke(executor.clj:645) ~[storm-core-1.0.1.jar:1.0.1] 
    at org.apache.storm.util$async_loop$fn__625.invoke(util.clj:484) [storm-core-1.0.1.jar:1.0.1] 
    at clojure.lang.AFn.run(AFn.java:22) [clojure-1.8.0.jar:?] 
    at java.lang.Thread.run(Thread.java:745) [?:1.8.0_102] 
53527 [Thread-359-test-spout-executor[295 295]] ERROR o.a.s.util - Halting process: ("Worker died") 
java.lang.RuntimeException: ("Worker died") 
    at org.apache.storm.util$exit_process_BANG_.doInvoke(util.clj:341) [storm-core-1.0.1.jar:1.0.1] 
    at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.8.0.jar:?] 
    at org.apache.storm.daemon.worker$fn__8554$fn__8555.invoke(worker.clj:761) [storm-core-1.0.1.jar:1.0.1] 
    at org.apache.storm.daemon.executor$mk_executor_data$fn__7773$fn__7774.invoke(executor.clj:271) [storm-core-1.0.1.jar:1.0.1] 
    at org.apache.storm.util$async_loop$fn__625.invoke(util.clj:494) [storm-core-1.0.1.jar:1.0.1] 
    at clojure.lang.AFn.run(AFn.java:22) [clojure-1.8.0.jar:?] 
    at java.lang.Thread.run(Thread.java:745) [?:1.8.0_102] 

请在下面找到 {key.deserializer = org.apache.kafka.common.serialization.ByteArrayDeserializer,value.deserializer = org.apache.kafka.common.serialization.ByteArrayDeserializer,组完整的壶嘴CONFIGS。 id = test-group,ssl.keystore.location = C:/test.jks,bootstrap.servers = localhost:1000,auto.commit.interval.ms = 1000,security.protocol = SSL,enable.auto.commit = true ,ssl.truststore.location = C:/test1.jks,ssl.keystore.password = pass123,ssl.key.password = pass123,ssl.truststore.password = pass123,session.timeout.ms = 30000,auto.offset。重置=最新}

回答

0

风暴1.0.1包含风暴卡夫卡客户端的测试质量。我们已经修复了几个问题,在Storm 1.1版本中提供了更稳定的版本,可以用来对付Kafka 0.10以后的版本。 在您的拓扑中,您​​可以使用适当的版本来依赖storm-kafka-client版本1.1和kafka-clients依赖项。您不需要升级风暴群集本身。

+0

谢谢@Sriharsha Chintalapani我们还需要修复https://github.com/apache/storm/pull/1924。你能告诉我哪个版本的storm-kafka-client应该与Storm 1.0.1一起使用 –

0

我有enable.auto.commit = true使值为false解决了我的问题。