2016-02-23 66 views
0

我看到下面的代码消费来自卡夫卡的消息。有20个分区有1个主题,使用ExecutorService创建20个线程。每个分区有20个消息流。运行此程序时,将读取20条消息并从主题进行处理。当其中一个线程完成处理时,我假设下一条消息将被读取。消费消费使用卡夫卡消费者 - Java

如果在100个消息位于主题中的示例场景中,将读取所有消息并将其保存在内存中,并且每次都将由线程处理20个消息,或者仅在消息之后才从主题读取消息目前正在处理的线程是否被处理?

public void run(int a_numThreads) { 
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); 
    topicCountMap.put(topic, new Integer(a_numThreads)); 
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); 
    List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); 

    // now launch all the threads 
    // 
    executor = Executors.newFixedThreadPool(20); 

    // now create an object to consume the messages 
    // 
    int threadNumber = 0; 
    for (final KafkaStream stream : streams) { 
     executor.submit(new ConsumerTest(stream, threadNumber)); 
     threadNumber++; 
    } 
} 

编辑:我遇到了这个post的答案。但我有以下问题:

如果有20个分区的单个主题,我可以在2个不同的节点上运行消费者?我应该在每个消费者中提到消息流的数量为10吗?当我节点失败或出现性能问题时,数据流会自动重新平衡到工作节点吗?

回答

1

是的,您可以在不同的节点上运行多个消费者以使用同一主题。基于机器配置,消息流的数量可以不同。如果它的小机器可以给5个左右。

如果一个节点发生故障,它会自动转移到加载到其他节点。除了失败之外,还有其他一些属性,如topic.metadata.refresh.interval.ms,它们决定何时重新平衡加载。

+0

谢谢帕雷希。如果有20个分区,我可以在两个节点上分别设置20个分区吗?如果一个失败,我想所有的20个分区将由一个消费者中的20个线程处理。 –

+0

分区是在创建主题时指定的。如果您的配置中有1个代理,则所有20个分区都在一个代理中。如果你有两个经纪人,分配分配。您不需要指定消费级别的分区数量。所以要回答你的这个问题“如果一个人失败了,我想所有的20个分区将由一个消费者中的20个线程处理”,如果一个失败,所有分区移动到一个节点并由单个消费者处理。 – Paresh

+0

对不起,我不清楚我的问题。假设我的集群和20个分区中有2个节点。我假设20个分区将分布在2个节点上,流量将自行重新平衡。我有一个使用20个线程创建20个消息流并处理数据的消费者项目。如果我在2个节点中部署相同的应用程序,10个分区将由1个实例消费者应用程序处理,其他10个将由另一个实例处理。 –