2014-12-05 48 views
0

我累的链接查看未处理的邮件使用简单的消费

https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example

使用SimpleConsumer消耗的消息之后,但在使用它,我发现了一些突然的行为如下:

消费者正在使用特定分区的消息。但问题是,当我的消费者正在运行并使用生产者将消息推送到主题时,它会使用该分区的消息。但是,如果我的消费者目前没有运行,并且将某些消息推送到该主题并再次启动消费者,则消费者不会消费由生产者推送的消息,而是再次准备消费现在将被推送的消息。我正在使用LatestTime()而不是EarliestTime(),因为我只想使用未处理的消息。

例如

案例-1

消费者正在运行:

监制推M1,M2,M3的消息来划分主题的11

结果是:消费者将消耗所有三个消息。

案例 - 2

消费者没有运行

制片人现在推M4,M5 M6 messgae分区主题1现在1名

消费者调用

结果:消费者DONOT消费messgaes m4,m5,m6但是如果我将检查偏移量,那么它将被设置为7.这意味着生产者已经将偏移量提前到7,同时产生消息,结果消费者将消耗来自偏移量7的消息

当消费者再次出现时,请理想地帮助他们阅读来自m4的消息。

回答

0

你做错了。首先我不确定SimpleConsumer是你在找什么。它迫使你自己管理偏移量(例如,它根本不会向Zookeeper提交偏移量,并且每次你再次启动一个SimpleConsumer时它将再次获取相同的消息)。 SimpleConsumer不理解“已处理的消息”。它所能做的只是从一些偏移量开始取回,并继续提取,直到你说“停止”。

无论如何,如果你打算提交自己处理的偏移量,你应该使用EarliestTimeauto.offset.reset=smallest配置项)。 auto.offset.reset意味着,如果你的消费与错误的偏移初始化(和SimpleConsumer被初始化-1如果我没有记错的偏移,这显然是错误的),它会重置抵消要么smallest可用(EarliestTime)或largest可用(LatestTime)。

这样可以很清楚这里的例子:

Case-1

创建一个消费者,当它与最初错误的偏移初始化它指向的话题1分区1,它会问经纪人为一些适当的偏移量(这里是smallestlargest偏移量重置进来)。如果您还没有发出任何消息,则smallestlargest偏移量都将为0,因此当您生成一些消息时,消费者将获取这些消息。

Case-2

你产生N个消息(比如7)。然后你开始你的SimpleConsumer。再次,它用错误的偏移量进行初始化,并要求代理商进行适当的抵消。使用smallest重置抵消将它为0,并与largest抵消它将是7。正如在你的例子中你使用LargestOffsets你的消费者将重新初始化偏移7并开始消费它。

一般来说,看看高层次的消费者,在大多数情况下,这就是你正在寻找的。 这里的link

+0

但是,如果我使用HighLevelConsumer,那么它不会提供给我一种将消费者关联到toipc中的特定分区的方式。我的要求是 - 我有n个生产者分别连接到特定分区,每个连接到特定分区的n个消费者。分区是在一些关键的基础上完成的。所以我在这种情况下选择了SimpleConsumer。 – 2014-12-05 09:19:42

+0

@ajaymittal那么不幸的是,你必须自己管理偏移量。高级用户不提供配置来使用特定的分区。 – serejja 2014-12-05 09:23:07

+0

在使用SimpleConsumer时,我只能使用EarliestTime()作为0或latestTime()给出最近的偏移量。有没有办法获得偏移点到未处理消息的索引。 – 2014-12-05 09:26:14