一个典型的卡夫卡消费者如下所示:卡夫卡高级用户0.8.4防止邮件丢失
卡夫卡经纪人--->卡夫卡消费者---->下游消费者喜欢弹性搜索
而且根据对Kafka High Level Consumer的文档:
的“auto.commit.interval.ms”设置更新频率的 消耗偏移写入的ZooKeeper
看来,有可能是邮件丢失,如果以下两件事情:
- 偏移致力于刚过有些消息是从卡夫卡经纪人取回。
- 下游消费者(比如Elastic-Search)未能处理最近一批消息,或者消费者进程本身被终止。
这或许会是最为理想的,如果偏移量不犯自动根据一定的时间间隔,但他们是通过一个API承诺。这将确保kafka-消费者只有在收到来自下游消费者的确认他们已经成功消费消息之后才能发信号通知补偿。可能会有一些消息的重播(如果kafka-消费者在抵消之前死亡),但至少不会有消息丢失。
请让我知道,如果这样的API存在于高级消费者。
注:我知道在卡夫卡的0.8.4版本低层次的消费API的,但我不希望管理自己的一切时,我需要的是高层次的消费只是一个简单的API。
价:
- AutoCommitTask.run(),寻找commitOffsetsAsync
- SubscriptionState.allConsumed()