2017-09-02 43 views
4

我是新来的卡夫卡在以前的分区中的偏移的样子。目前我这个Channel Consumer example从汇合公司的Github上回购如何快退和使用卡夫卡转到客户的消费

据我所知试验,消费者可分为多个组。每个组在分区中都有自己的偏移量。假设我在特定主题中有40条消息,我们称之为owner_commands。属于狗群的消费者加入并开始消费这40条消息。

当我断开并重新连接这种消费,我注意到,消息显示不出来了。它说我已经到达文件的末尾。但是,如果我将该群集与属于不同群组的其他消费者(如猫)加入群集,我可以再次阅读这40条消息。

你知道,如果有消费者犬群中快退和再次使用卡夫卡的Go API重放这些消息的方式。我查看了Kafka Golang API的源代码,找不到任何指示我可以倒带并查看过去特定消息的任何内容。

谢谢

+1

您可能要检查https://github.com/Shopify/sarama这是卡夫卡流行的库 - 它允许您指定特定的偏移,你希望从阅读,或接受“最老”的开始点或'最新' – Andrew

回答

1

你可以使用CommitOffsets,只是承诺还给你要退给抵消。下一次轮询将从该偏移量开始。

CommitOffsets记录在这里: http://docs.confluent.io/current/clients/confluent-kafka-go/index.html#Consumer.CommitOffsets

外API的,有一个在卡夫卡的消费群体的功能命令移动消费群体的立场。这与Apache Kafka 0.11一起发布。

+0

谢谢你的回答,现在我对卡夫卡的工作原理有了更好的理解。我只是意识到,我必须将enable.auto.commit设置为false才能使其工作。使用我之前的例子,对于我的狗群,我在owner_commands [0] @ 0上执行提交,当我断开我的使用者并重新连接它时,从[0] @ 0到最近的消息的所有消息都会重播! 但是,我想知道更多关于移动消费者群体的命令的命令。为什么它在API之外?我怎样才能访问? – cfeng

+1

kafka-consumer-groups命令由shell脚本执行,并附带Kafka包。你可以在bin目录下找到它。它更像是一个经纪人端操作,所以它不在API中。它实际上使用Java消费者API来完成它的功能。 – dawsaw