2014-09-11 74 views
2

我是Java新手,使用博客https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example,尝试开发嵌入式适配器以接收来自Kafka的流。 这是代码的一部分,它假定消费者是单线程的。Kafka简单的消费者 - 在迭代器上获取错误

public void run() { 
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); 
    topicCountMap.put(topic, new Integer(getNumThreads())); 
    Map<String, List<KafkaStream<String, String>>> consumerMap = consumer.createMessageStreams(topicCountMap); 
    List<KafkaStream<String, String>> streams = consumerMap.get(topic); 

    executor = Executors.newFixedThreadPool(1); 

    final KafkaStream<String, String> stream = streams.get(0); 
    ConsumerIterator<String, String> it = stream.iterator(); 
    while (it.hasNext()) { 
      // fill the tuple and output the tuple 
      fillAndOutputTuple(); 
      } 

我得到这个错误来自Eclipse IDE的it.hasNext(): 类文件迭代器包含一个签名 '(I)Lscala /收集/ Iterator.GroupedIterator;'病态的57位

(奇怪的是,57位上不存在一样,它提供了错误。)

真的很感谢所有帮助

+0

'final KafkaStream stream = streams.get(0);'line?你使用过任何调试器吗? – user2720864 2014-09-11 05:44:53

回答

2

这个错误似乎发生在混合Java代码用Scala编写的库(如Kafka)。你能否验证scala-library-x.yz.jar(x.yz是你使用的Kafka构建所需的Scala版本)是否在你的eclipse项目的构建路径中?请看https://groups.google.com/forum/#!topic/liftweb/f0IAaqoWyu4

+0

非常感谢。添加该lib照顾问题。谢谢。 – FZF 2014-09-11 18:56:08