0
我看到这个便条卡夫卡消费文档中 -卡夫卡消费者 - Java客户端
因为有许多的分区,这还是平衡了许多 消费者的情况下的负载。不过请注意,不能有比分区更多的实例。
我有50个分区为单个主题。如果我将a_numThreads值设置为50,则从每个分区中获取1条消息?上述消息是否意味着我无法在任何时候创建超过50个线程?
public void run(int a_numThreads) {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(a_numThreads));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
// now launch all the threads
//
executor = Executors.newFixedThreadPool(a_numThreads);
// now create an object to consume the messages
//
int threadNumber = 0;
for (final KafkaStream stream : streams) {
executor.submit(new ConsumerTest(stream, threadNumber));
threadNumber++;
}
}