2016-05-13 281 views
3

我正在使用spring-cloud-starter-stream-kafka使用spring云流。我已绑定我的渠道,卡夫卡主题作为application.properties如下:Spring Cloud Stream @ServiceActivator不会将消息发送到errorChannel异常

spring.cloud.stream.bindings.gatewayOutput.destination=received 
spring.cloud.stream.bindings.enrichingInput.destination=received 
spring.cloud.stream.bindings.enrichingOutput.destination=enriched 
spring.cloud.stream.bindings.redeemingInput.destination=enriched 
spring.cloud.stream.bindings.redeemingOutput.destination=redeemed 
spring.cloud.stream.bindings.fulfillingInput.destination=redeemed 
spring.cloud.stream.bindings.error.destination=errors12 
spring.cloud.stream.bindings.errorInput.destination=errors12 
spring.cloud.stream.bindings.errorOutput.destination=errors12 

我不能让我的程序产生异常信息的错误通道。令人惊讶的是,它甚至似乎没有尝试生成它,即使我在不​​同的线程(我有一个@MessagingGateway将信息转储到gatewayOutput,然后其余的流程异步发生)。这里是我的ServiceActivator的定义:

@Named 
@Configuration 
@EnableBinding(Channels.class) 
@EnableIntegration 
public class FulfillingServiceImpl extends AbstractBaseService implements 
     FulfillingService { 

    @Override 
    @Audit(value = "annotatedEvent") 
    @ServiceActivator(inputChannel = Channels.FULFILLING_INPUT, requiresReply = "false") 
    public void fulfill(TrivialRedemption redemption) throws Exception { 

     logger.error("FULFILLED!!!!!!"); 

     throw new Exception("test exception"); 

    } 
} 

这里是生成的日志(我已经截断了全除外)。有没有...

  • 投诉约errorChannel没有任何用户
  • 卡夫卡生产者线程记录
 
2016-05-13 12:13:14 pool-6-thread-1 DEBUG KafkaMessageChannelBinder$ReceivingHandler:115 - org.springframework.cloud[email protected]2b461688 received message: GenericMessage [payload=byte[400], headers={kafka_offset=17, kafka_messageKey=null, kafka_topic=redeemed, kafka_partitionId=0, kafka_nextOffset=18}] - {} 
2016-05-13 12:13:14 pool-6-thread-1 DEBUG DirectChannel:430 - preSend on channel 'fulfillingInput', message: GenericMessage [[email protected][endpoints=[[email protected]],orderId=f72b2d9b-4e60-43fa-95d4-1b0b368fe49f,systemCategory=DEMO,systemSubCategory=,properties=,monetaryRedemptionAmount=456.78], headers={kafka_offset=17, kafka_messageKey=null, kafka_topic=redeemed, kafka_partitionId=0, kafka_nextOffset=18, contentType=application/x-java-object;type=com.test.system.poc.model.v3.TrivialRedemption}] - {} 
2016-05-13 12:13:14 pool-6-thread-1 DEBUG ServiceActivatingHandler:115 - ServiceActivator for [org.spr[email protected]64bce7ab] (fulfillingServiceImpl.fulfill.serviceActivator.handler) received message: GenericMessage [[email protected][endpoints=[[email protected]],orderId=f72b2d9b-4e60-43fa-95d4-1b0b368fe49f,systemCategory=DEMO,systemSubCategory=,properties=,monetaryRedemptionAmount=456.78], headers={kafka_offset=17, kafka_messageKey=null, kafka_topic=redeemed, kafka_partitionId=0, kafka_nextOffset=18, contentType=application/x-java-object;type=com.test.system.poc.model.v3.TrivialRedemption}] - {} 
2016-05-13 12:13:14 pool-6-thread-1 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'integrationEvaluationContext' - {} 
2016-05-13 12:13:14 pool-6-thread-1 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'integrationConversionService' - {} 
2016-05-13 12:13:14 pool-6-thread-1 ERROR FulfillingServiceImpl$$EnhancerBySpringCGLIB$$9dad62:42 - FULFILLED!!!!!! - {} 
2016-05-13 12:13:14 pool-6-thread-1 ERROR LoggingErrorHandler:35 - Error while processing: KafkaMessage [Message(magic = 0, attributes = 0, crc = 3373691507, key = null, payload = java.nio.HeapByteBuffer[pos=0 lim=400 cap=400]), KafkaMessageMetadata [offset=17, nextOffset=18, Partition[topic='redeemed', id=0]] - {} 
... 
... 
2016-05-13 12:13:14 kafka-fetch-1 TRACE BoundedByteBufferReceive:36 - 40 bytes read. - {} 
2016-05-13 12:13:14 kafka-fetch-1 TRACE DefaultConnection:126 - Reading from Partition[topic='enriched', id=0]@18 - {} 
2016-05-13 12:13:14 kafka-fetch-1 TRACE BoundedByteBufferSend:36 - 60 bytes written. - {} 
2016-05-13 12:13:14 kafka-fetch-1 TRACE BoundedByteBufferReceive:36 - 40 bytes read. - {} 
2016-05-13 12:13:14 kafka-fetch-1 TRACE DefaultConnection:126 - Reading from Partition[topic='redeemed', id=0]@18 - {} 
2016-05-13 12:13:14 kafka-fetch-1 TRACE BoundedByteBufferSend:36 - 60 bytes written. - {} 
2016-05-13 12:13:15 kafka-fetch-1 TRACE BoundedByteBufferReceive:36 - 40 bytes read. - {} 
2016-05-13 12:13:15 kafka-fetch-1 TRACE DefaultConnection:126 - Reading from Partition[topic='errors12', id=0]@0 - {} 
2016-05-13 12:13:15 kafka-fetch-1 TRACE BoundedByteBufferSend:36 - 60 bytes written. - {} 
2016-05-13 12:13:15 kafka-fetch-1 TRACE BoundedByteBufferReceive:36 - 40 bytes read. - {} 
2016-05-13 12:13:15 kafka-fetch-1 TRACE BoundedByteBufferSend:36 - 60 bytes written. - {} 

编辑:这是我的渠道类的内容:

公共接口通道{

public static final String GATEWAY_OUTPUT = "gatewayOutput"; 

public static final String ENRICHING_INPUT = "enrichingInput"; 
public static final String ENRICHING_OUTPUT = "enrichingOutput"; 

public static final String REDEEMING_INPUT = "redeemingInput"; 
public static final String REDEEMING_OUTPUT = "redeemingOutput"; 

public static final String FULFILLING_INPUT = "fulfillingInput"; 
public static final String FULFILLING_OUTPUT = "fulfillingOutput"; 

@Output(GATEWAY_OUTPUT) 
MessageChannel gatewayOutput(); 

@Input(ENRICHING_INPUT) 
MessageChannel enrichingInput(); 

@Output(ENRICHING_OUTPUT) 
MessageChannel enrichingOutput(); 

@Input(REDEEMING_INPUT) 
MessageChannel redeemingInput(); 

@Output(REDEEMING_OUTPUT) 
MessageChannel redeemingOutput(); 

@Input(FULFILLING_INPUT) 
MessageChannel fulfillingInput(); 

@Output(FULFILLING_OUTPUT) 
MessageChannel fulfillingOutput(); 

回答

0

您不会显示您的Channels类,但活页夹不知道您的“错误”渠道是“特殊”的。

活页夹可以配置为重试并将例外发送到死信主题;请参阅1.0.0.RELEASE中的this PR

或者,也可以在服务激活之前增加一个“中间流”网关 - 把它像一个“try/catch语句”中的Java模块:

@MessageEndpoint 
public static class GatewayInvoker { 

    @Autowired 
    private ErrorHandlingGateway gw; 

    @ServiceActivator(inputChannel = Channels.FULFILLING_INPUT) 
    public void send(Message<?> message) { 
     this.gw.send(message); 
    } 

} 

@Bean 
public GatewayInvoker gate() { 
    return new GatewayInvoker(); 
} 

@MessagingGateway(defaultRequestChannel = "toService", errorChannel = Channels.ERRORS) 
public interface ErrorHandlingGateway { 

    void send(Message<?> message); 

} 

更改您的服务激活的输入通道toService

您必须将@IntegrationComponentScan添加到您的配置类中,以便框架可以检测到@MessagingGateway接口并为其构建代理。

编辑

只是向我提出另一种方法是在你的服务激活的意见链添加ExpressionEvaluatingAdvice

+0

此外,请按照https://github.com/spring-cloud/spring-cloud-stream/issues/538上的进度了解Spring Cloud Stream中的未来解决方案。 –

+0

感谢您的全面回答,我仍在消化它。我已经添加了我的Channels.class,并且喜欢采用基于建议的方法的建议,因为我也对状态重试行为感兴趣。我对你的答案的一些观点有点困惑。您提到[活页夹可以配置为重试并将例外发送到死信主题;看到这个PR在1.0.0.RELEASE中。]。我正在使用1.0.0.RC2,这似乎早于公关。 1.0.0.RELEASE何时发布? –

+0

[上周发布](https://spring.io/blog/2016/05/10/spring-cloud-stream-1-0-0-release-is-available)。 –

相关问题