2015-09-25 210 views
1

我正在关注此网址https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example中提供的有关卡夫卡主题同时消费的示例。同时作为卡夫卡消费者使用多个主题

在创建线程池的部分,它们具有以下代码

public void run(int a_numThreads) { 
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); 
    topicCountMap.put(topic, new Integer(a_numThreads)); 
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); 
    List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); 


    // now launch all the threads 
    // 
    executor = Executors.newFixedThreadPool(a_numThreads); 

    // now create an object to consume the messages 
    // 
    int threadNumber = 0; 
    for (final KafkaStream stream : streams) { 
     executor.submit(new ConsumerTest(stream, threadNumber)); 
     threadNumber++; 
    } 
} 

我可以给topicCountMap添加更多主题。例如,

topicCountMap.put("channel1", new Integer(a_numThreads)); 
topicCountMap.put("channe2", new Integer(a_numThreads)); 
topicCountMap.put("channel3", new Integer(a_numThreads)); 

在上面的代码,在我看来,这些流对象只能映射到的主题

List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); 

我不是如何创建多个数据流对象完全肯定一个,每个映射到给定的主题,然后遍历它们从每个渠道获取数据,并让它们提交给执行者。

回答

4

假设你有:

String topic1 = "channel1"; 
String topic2 = "channel2"; 
String topic3 = "channel3"; 

然后,的确可以这样做:

topicCountMap.put(topic1, new Integer(a_numThreads_topic1)); 
topicCountMap.put(topic2, new Integer(a_numThreads_topic2)); 
topicCountMap.put(topic3, new Integer(a_numThreads_topic3)); 

一旦你的consumerMap(即做即不改变的代码),你就可以检索每个主题的流:

List<KafkaStream<byte[], byte[]>> topic1_streams = consumerMap.get(topic1); 
List<KafkaStream<byte[], byte[]>> topic2_streams = consumerMap.get(topic2); 
List<KafkaStream<byte[], byte[]>> topic3_streams = consumerMap.get(topic3); 

要从流中消耗,您需要创建正确数量的执行人:

executors_topic1 = Executors.newFixedThreadPool(a_numThreads_topic1); 
executors_topic2 = Executors.newFixedThreadPool(a_numThreads_topic2); 
executors_topic3 = Executors.newFixedThreadPool(a_numThreads_topic3); 

最后:

int threadNumber = 0; 
for (final KafkaStream stream : topic1_streams) { 
    executors_topic1.submit(new ConsumerTest(streams, threadNumber)); 
    threadNumber++; 
} 
for (final KafkaStream stream : topic2_streams) { 
    executors_topic2.submit(new ConsumerTest(stream, threadNumber)); 
    threadNumber++; 
} 
for (final KafkaStream stream : topic3_streams) { 
    executor_topic3.submit(new ConsumerTest(stream, threadNumber)); 
    threadNumber++; 
} 

当然,这只是给你的想法。显然,代码可以改进。

+0

为什么你需要每个主题的执行者? – nachokk