2017-06-27 4601 views
0

当我将enable.auto.commit设置为false并尝试使用基于注释的spring-kafka @KafkaListener手动提交偏移时,我得到一个org.springframework.kafka .listener.ListenerExecutionFailedException:监听方法无法进入的消息被调用Acknowledgement.acknowledge()在spring-kafka中抛出异常@KafkaListener

我有一个非常简单的代码如下:

@KafkaListener(id = "someid", topics = "${demo.topic}", containerFactory = "someContainerFactory") 
public void listenFooGroup(String message, Acknowledgement ack) { 
    System.out.println("Received Messasge in group 'foo': " + message); 

    // TODO: Do something with the message 
} 

当我从生产者发送消息,我得到以下例外:

org.springframework.kafka.listener.ListenerExecutionFailedException:传入消息无法调用Listener方法。

端点处理细节:

方法[公共无效COM **** ***** ******* KafkaMessageListener.listenFooGroup(java.lang.String中,org.springframework。。 .kafka.support.Acknowledgement)]

Bean [com.****.*****.*******[email protected]];嵌套异常是org.springframework.messaging.converter.MessageConversionException:无法处理消息;嵌套异常是org.springframework.messaging.converter.MessageConversionException:对于GenericMessage,无法从[java.lang.String]转换为[org.springframework.kafka.support.Acknowledgegment] [payload = test,headers = {kafka_offset = 57,kafka_receivedMessageKey = NULL,kafka_receivedPartitionId = 0,kafka_receivedTopic = demotopic}], failedMessage = GenericMessage [有效载荷=测试,标头= {kafka_offset = 57,kafka_receivedMessageKey = NULL,kafka_receivedPartitionId = 0,kafka_receivedTopic = demotopic}]

请帮助。 TIA。

回答

3

您必须将容器工厂的containerProperties ackMode设置为MANUALMANUAL_IMMEDIATE以获得Acknowledgment对象。

在其他ack模式下,容器负责提交偏移量。

factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE) 

或者,如果使用了Spring启动

+0

2.0将[抛出一个不太晦涩例外]设置....ackMode属性(https://github.com/spring-projects/spring-kafka/pull/356) - 感谢您指出了这一点。 –

+0

它现在不会抛出异常。非常感谢。感谢你的帮助。 –

+1

下一个版本将抛出[更有意义的例外](https://github.com/spring-projects/spring-kafka/pull/356)。 'new IllegalStateException(“没有确认可用作参数,监听器容器必须有一个MANUAL Ackmode来填充确认。”,'。谢谢你指出。 –