1
我正在关注此网址https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example中提供的有关卡夫卡主题同时消费的示例。同时作为卡夫卡消费者使用多个主题
在创建线程池的部分,它们具有以下代码
public void run(int a_numThreads) {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(a_numThreads));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
// now launch all the threads
//
executor = Executors.newFixedThreadPool(a_numThreads);
// now create an object to consume the messages
//
int threadNumber = 0;
for (final KafkaStream stream : streams) {
executor.submit(new ConsumerTest(stream, threadNumber));
threadNumber++;
}
}
我可以给topicCountMap添加更多主题。例如,
topicCountMap.put("channel1", new Integer(a_numThreads));
topicCountMap.put("channe2", new Integer(a_numThreads));
topicCountMap.put("channel3", new Integer(a_numThreads));
在上面的代码,在我看来,这些流对象只能映射到的主题
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
我不是如何创建多个数据流对象完全肯定一个,每个映射到给定的主题,然后遍历它们从每个渠道获取数据,并让它们提交给执行者。
为什么你需要每个主题的执行者? – nachokk