2017-08-14 75 views
0

kafka流处理在我们的系统中执行以进行事务处理。解决方案如下实现,流处理后从kafka主题中移除邮件

卡夫卡制作者向卡夫卡主题发布事件,流处理器处理输入事件并执行聚合操作。流处理之后,该事件将发布到另一个主题。由于在第一个主题中没有实现消费者,我如何从第一个主题中删除处理后的消息。

回答

2

考虑到您的流处理链是第一个主题的使用者。如果您出于某种原因需要重新处理原始数据(例如,如果您意识到流处理逻辑中存在错误),那么即使在处理完第一个主题后,您也可能希望获得原始消息。

因此,您不需要删除邮件,您必须在该主题上设置适合您需求的保留策略。折衷通常是数据可用时间与需要的存储量之间的关系。

1

无法手动从kafka中删除邮件(无法在磁盘上删除数据,AFAIK)。你有三种选择:

  • 使用基于时间的保留策略(例如让卡夫卡删除所有邮件自动年龄超过1小时)

  • 使用基于存储的保留策略(让卡夫卡保持话题尺寸是一些预定义的值)

  • 使用主题压缩策略 - 让kafka保留您的密钥的最新版本。所有旧版本的密钥将被删除(压缩)。

正如Luciano Afranllie所述 - 您不需要手动删除消息。您可以处理消息并让卡夫卡根据您的策略管理主题。