我是Java新手,使用博客https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example,尝试开发嵌入式适配器以接收来自Kafka的流。 这是代码的一部分,它假定消费者是单线程的。Kafka简单的消费者 - 在迭代器上获取错误
public void run() {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(getNumThreads()));
Map<String, List<KafkaStream<String, String>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<String, String>> streams = consumerMap.get(topic);
executor = Executors.newFixedThreadPool(1);
final KafkaStream<String, String> stream = streams.get(0);
ConsumerIterator<String, String> it = stream.iterator();
while (it.hasNext()) {
// fill the tuple and output the tuple
fillAndOutputTuple();
}
我得到这个错误来自Eclipse IDE的it.hasNext(): 类文件迭代器包含一个签名 '(I)Lscala /收集/ Iterator.GroupedIterator;'病态的57位
(奇怪的是,57位上不存在一样,它提供了错误。)
真的很感谢所有帮助
'final KafkaStream stream = streams.get(0);'line?你使用过任何调试器吗? –
user2720864
2014-09-11 05:44:53