2016-11-05 121 views
1

如何捕捉写入卡夫卡主题的错误?春云流发送给卡夫卡错误控制处理

import org.springframework.cloud.stream.annotation.EnableBinding; 
import org.springframework.cloud.stream.annotation.StreamListener; 
import org.springframework.cloud.stream.messaging.Processor; 
import org.springframework.messaging.Message; 
import org.springframework.messaging.handler.annotation.SendTo; 
import org.springframework.stereotype.Component; 

import javax.xml.bind.JAXBException; 

@Component 
@EnableBinding(Processor.class) 
public class Parser { 

    @StreamListener(Processor.INPUT) 
    @SendTo(Processor.OUTPUT) 
    public String process(Message<?> input) { 

     // do smth 

     // how to catch an error if sending message to 'output' topic fails 
    } 

} 

将生产者切换到同步模式。

spring.cloud.stream.kafka.bindings.output.producer.sync=true 

下一步是什么?任何例子?扩展一些绑定impl,添加一些AOP魔法?

回答

0

我想你需要的是注册一个KafkaProducerListener,它处理你的情况下的错误情况,并使它在应用程序中作为bean可用。

有关KafkaProducerListener更多信息是here

另外,还要注意平时的失败的信息将可在errorChannel具有频道名称error。一旦您为错误通道配置了destination名称,即所有错误消息都会发布:spring.cloud.stream.bindings.error.destination

+0

感谢您的输入。我真的希望我的目标能够通过Spring Cloud Stream实现。 – user3665549

+0

您是否找到使用Spring云流的解决方案?关于错误处理的文档非常少,令人惊讶的是,我在我的配置文件中设置了目标错误通道,但没有发布错误信息。 –

+0

嗨,这是在这里解决:https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/pull/125 –