2017-07-28 66 views
0

我有一个基于微服务的应用程序,它从Kafka主题读取消息。当服务关闭时,如果在主题上有任何消息正在写入,我希望消费者在下次启动并运行时阅读这些消息。但是当服务停止时,我错过了所有的消息。如何让消费者读取服务关闭时未读取的消息?与Kafka的春天云流 - 重新启动消费者后邮件未被读取

当我的微服务启动并且任何消息正在返回到主题时,我正在收到所有消息。

我application.properties:

spring.cloud.stream.bindings.input.destination=test 
spring.cloud.stream.bindings.input.consumer.headerMode=raw 
spring.cloud.stream.bindings.input.consumer.startOffset=latest 
spring.cloud.stream.bindings.input.consumer.resetOffsets=true 
spring.cloud.stream.bindings.input.consumer.instanceCount=3 
spring.cloud.stream.bindings.input.consumer.autoCommitOffset=false 

//这是我的微服务的根目录

@EnableBinding(Sink.class) 
public class Consumer { 
    @ServiceActivator(inputChannel = Sink.INPUT) 
    public void consoleSink(Object payload){ 
     logger.info("Type: "+ payload.getClass() + " which is byte array"); 
     logger.info("Payload: " + new String((byte[])payload)); 
    } } 

我明白任何线索来解决这个问题,在我的消费者的代码。

回答

0

设置下面的属性帮助我解决了我的问题。

spring.cloud.stream.bindings.input.destination=test 
spring.cloud.stream.bindings.input.consumer.headerMode=raw 
spring.cloud.stream.bindings.input.consumer.startOffset=latest 
spring.cloud.stream.bindings.input.consumer.resetOffsets=true 
spring.cloud.stream.bindings.input.consumer.instanceCount=3 
spring.cloud.stream.bindings.input.consumer.autoCommitOffset=false 
spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOffset=false 
spring.cloud.stream.kafka.binder.autoCreateTopics=false 
spring.cloud.stream.bindings.input.group=testGroup50 
spring.cloud.stream.bindings.input.partitioned=false 

感谢,

BR