2016-09-19 173 views
4

我有一个主题列表(现在它是10),其大小可以在未来增加。我知道我们可以产生多个线程(每个主题)来消费每个主题,但在我的情况下,如果主题数量增加,那么消耗主题的线程数就会增加,这是我不想要的,因为主题不是会频繁获取数据,因此这些线程将处于理想状态。卡夫卡消费者为多个主题

是否有任何方法可让单个消费者从所有主题中消费?如果是的话,那我们该如何实现呢? Kafka也将如何维持抵消?请提出答案。

回答

4

我们可以通过下面的API为多个主题订阅: consumer.subscribe(Arrays.asList(TOPIC1,标题2),ConsumerRebalanceListener OBJ)

消费者对该主题的信息,我们可以使用COMIT或consumer.commitAsync消费者.commitSync()通过创建OffsetAndMetadata对象,如下所示。

ConsumerRecords<String, String> records = consumer.poll(long value); 
for (TopicPartition partition : records.partitions()) { 
    List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); 
    for (ConsumerRecord<String, String> record : partitionRecords) { 
     System.out.println(record.offset() + ": " + record.value()); 
    } 
    long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); 
    consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); 
} 
+0

我知道,我们可以,但卡夫卡如何保持胶印?另外,拥有一个消费者群体能够解决我的问题吗? – Apollo

+1

偏移量由您的应用提交并存储在一个名为__consumer_offsets的特殊偏移量kafka主题中。为每个主题的每个分区保留偏移量,因此与您订阅的主题数量无关。 –

1

不需要多个线程,您可以让一个消费者从多个主题中消费。 偏移量由zookeeper维护,因为kafka-server本身是无状态的。 每当消费者消费一条消息时,其偏移量就会与动物园管理员合作,以保留将来的消息只处理每条消息一次。因此,即使在kafka失败的情况下,消费者也会从最后一次提交的抵消开始消费。

+1

从Kafka 0.9及以上版本开始,偏移量将存储在Kafka主题中,而不是zookeeper –