当我将enable.auto.commit设置为false并尝试使用基于注释的spring-kafka @KafkaListener手动提交偏移时,我得到一个org.springframework.kafka .listener.ListenerExecutionFailedException:监听方法无法进入的消息被调用Acknowledgement.acknowledge()在spring-kafka中抛出异常@KafkaListener
我有一个非常简单的代码如下:
@KafkaListener(id = "someid", topics = "${demo.topic}", containerFactory = "someContainerFactory")
public void listenFooGroup(String message, Acknowledgement ack) {
System.out.println("Received Messasge in group 'foo': " + message);
// TODO: Do something with the message
}
当我从生产者发送消息,我得到以下例外:
org.springframework.kafka.listener.ListenerExecutionFailedException:传入消息无法调用Listener方法。
端点处理细节:
方法[公共无效COM **** ***** ******* KafkaMessageListener.listenFooGroup(java.lang.String中,org.springframework。。 .kafka.support.Acknowledgement)]
Bean [com.****.*****.*******[email protected]];嵌套异常是org.springframework.messaging.converter.MessageConversionException:无法处理消息;嵌套异常是org.springframework.messaging.converter.MessageConversionException:对于GenericMessage,无法从[java.lang.String]转换为[org.springframework.kafka.support.Acknowledgegment] [payload = test,headers = {kafka_offset = 57,kafka_receivedMessageKey = NULL,kafka_receivedPartitionId = 0,kafka_receivedTopic = demotopic}], failedMessage = GenericMessage [有效载荷=测试,标头= {kafka_offset = 57,kafka_receivedMessageKey = NULL,kafka_receivedPartitionId = 0,kafka_receivedTopic = demotopic}]
请帮助。 TIA。
2.0将[抛出一个不太晦涩例外]设置
....ackMode
属性(https://github.com/spring-projects/spring-kafka/pull/356) - 感谢您指出了这一点。 –它现在不会抛出异常。非常感谢。感谢你的帮助。 –
下一个版本将抛出[更有意义的例外](https://github.com/spring-projects/spring-kafka/pull/356)。 'new IllegalStateException(“没有确认可用作参数,监听器容器必须有一个MANUAL Ackmode来填充确认。”,'。谢谢你指出。 –