2016-05-17 142 views
1

我已经浏览了spring-cloud-stream 1.0.0.RELEASE的文档,似乎找不到有关错误处理的任何文档。spring-cloud-stream kafka错误处理

基于kafka 0.9的观察,如果我的消费者抛出一个RuntimeException,我会看到3次重试。三个试之后,我看到这个在日志中:

2016-05-17 09:35:59.216 ERROR 8983 --- [ kafka-binder-] o.s.i.k.listener.LoggingErrorHandler  : Error while processing: KafkaMessage [Message(magic = 0, attributes = 0, crc = 3731457175, key = null, payload = java.nio.HeapByteBuffer[pos=0 lim=130 cap=130]), KafkaMessageMetadata [offset=2, nextOffset=3, Partition[topic='reservation', id=1]] 

org.springframework.messaging.MessagingException: Exception thrown while invoking demo.sink.ReservationConsumer#handleReservation[1 args]; nested exception is java.lang.RuntimeException: no message 

在这一点上,消费者偏移滞后1,如果我重新开始消费,消息被再次重试3次。但是,如果我然后将另一条消息发送到同一分区,以便消费者不会抛出异常,则会更新消费者偏移量,并且我们抛出异常的原始消息不会在重新启动后重试。

这是记录在哪里,我没有找到?错误处理绑定器是特定的,还是s-c-s抽象出在绑定器中保持一致?我怀疑这是消费者补偿如何使用kafka活页夹进行更新的意外后果。我看到一个enableDlq kafka消费者属性被添加了,我即将对此进行测试,但我不确定如何处理kafka中的死信。我熟悉rabbitmq中的死信队列,但通过rabbitmq,我们可以使用rabbitmq铲子插件重新发布并重试dlq消息,以涵盖发生故障的原因是临时服务中断。我不知道有任何类似的功能可用于kafka,但我们自己并没有编写类似的实用程序。

更新:启用enableDlq kafka使用者属性的测试显示与错误处理相同的使用者偏移问题。当消费者抛出一个RuntimeException时,我看到3次重试,之后没有记录错误消息,并且我看到一条消息发布到error.<destination>.<group>,但消费者偏移量没有更新并滞后1.如果我重新启动消费者,它会尝试再次从原始主题分区处理相同的失败消息,重试3次,并将相同的消息再次放入error.<destination>.<group>主题(重复的dlq消息)。如果我向另一个消息发布消息不会抛出RuntimeException的同一主题分区,则偏移量会更新,并且在重新启动时不再重试原始失败消息。

我认为当消费者抛出一个错误时,消费者应该更新kafka中的消费者偏移量,而不管enableDlq是否为真。这至少可以使得所有重试尝试失败的消息都被丢弃(当enableDlq为false时)或发布到dlq并从不重试(当enableDlq为true时)。

回答

1

看起来像一个错误,我 - 监听器容器具有不被暴露粘结剂(或设置)属性autoCommitOnErrorfalse默认情况下)。调用错误处理程序(发布错误)后,如果布尔值为true,则提交偏移量。

请在github作为问题报告。

+0

感谢您的确认。 https://github.com/spring-cloud/spring-cloud-stream/issues/542 – gadams00