2017-08-17 126 views
1

我使用Spring集成的DSL实现。 我有下面的代码,我不能使用我的自定义错误流。当authenticate方法抛出运行时异常时,errorChannel开始处理。我丰富头文件来使用我的自定义错误流,但不能使用。弹簧集成DSL自定义错误通道不起作用

// In Class - 1 
@Bean 
    public MarshallingWebServiceInboundGateway marshallingWebServiceInboundGateway(BeanFactoryChannelResolver channelResolver, Jaxb2Marshaller marshaller) { 

     MarshallingWebServiceInboundGateway wsInboundGateway = new MarshallingWebServiceInboundGateway(); 
     wsInboundGateway.setRequestChannel(channelResolver.resolveDestination("incomingRequest.input")); 
     wsInboundGateway.setReplyChannel(channelResolver.resolveDestination("outgoingResponse.input")); 
     wsInboundGateway.setErrorChannel(channelResolver.resolveDestination("errorChannel")); 
     wsInboundGateway.setMarshaller(marshaller); 
     wsInboundGateway.setUnmarshaller(marshaller); 
     return wsInboundGateway; 
    } 


// In Class - 2 
@Bean 
    public IntegrationFlow incomingRequest() { 
     return f -> f.<Object, Class<?>>route(t -> t.getClass(), 
       mapping -> mapping.subFlowMapping(payloadType1(), 
         sf -> sf.gateway("type1.input", ConsumerEndpointSpec::transactional)) 
         .subFlowMapping(payloadType2(), 
           sf -> sf.gateway("type2.input", ConsumerEndpointSpec::transactional)), 
         conf -> conf.id("router:Incoming request router")); 
    } 

// In Class - 3 
    @Bean 
    public IntegrationFlow type1() { 
     IntegrationFlow integrationFlow = f -> f 
       .enrichHeaders(h -> h.header(MessageHeaders.ERROR_CHANNEL, "error222", true)) 
       .<Type1>handle((p, h) -> authentication.authenticate(p), 
         conf -> conf.id("service-activator:Authenticate")) 
       .transform(transformer::transformType1MsgToDataX, 
         conf -> conf.id("transform:Unmarshall type1 Message")) 
       .enrichHeaders(h -> h.headerExpression(TypeDataIntegrationMessageHeaderAccessor.MESSAGE_ID, "payload.id") 
         .headerExpression(TypeDataIntegrationMessageHeaderAccessor.MESSAGE_TYPE, "payload.messageType")) 
       .handle((GenericHandler<DataX>) repository::successResponseMessage, 
         conf -> conf.id("service-activator:return success")) 
       .channel("outgoingResponse.input") 
       ; 

     return integrationFlow; 
    } 

// In Class - 3 
@Bean 
    public IntegrationFlow error222Flow() { 

     return IntegrationFlows.from("error222").handle("repository", "failureResponseMessage").get() 

       ; 

    } 

编辑:

阿尔乔姆的答案后,我的代码如下图所示。但是,我仍然无法访问错误流中的标题参数。我得到的错误 - “没有通道通过路由器解决‘路由器:错误响应准备’”

// In Class - 1 
@Bean 
    public MarshallingWebServiceInboundGateway marshallingWebServiceInboundGateway(BeanFactoryChannelResolver channelResolver, Jaxb2Marshaller marshaller) { 

     MarshallingWebServiceInboundGateway wsInboundGateway = new MarshallingWebServiceInboundGateway(); 
     wsInboundGateway.setRequestChannel(channelResolver.resolveDestination("incomingRequest.input")); 
     wsInboundGateway.setReplyChannel(channelResolver.resolveDestination("outgoingResponse.input")); 
     wsInboundGateway.setErrorChannel(channelResolver.resolveDestination("errorResponse.input")); 
     wsInboundGateway.setMarshaller(marshaller); 
     wsInboundGateway.setUnmarshaller(marshaller); 
     return wsInboundGateway; 
    } 


// In Class - 2 
@Bean 
    public IntegrationFlow incomingRequest() { 
     return f -> f.<Object, Class<?>>route(t -> t.getClass(), 
       mapping -> mapping.subFlowMapping(payloadType1(), 
         sf -> sf.gateway("type1.input", ConsumerEndpointSpec::transactional)) 
         .subFlowMapping(payloadType2(), 
           sf -> sf.gateway("type2.input", ConsumerEndpointSpec::transactional)), 
         conf -> conf.id("router:Incoming request router")); 
    } 

// In Class - 2 
@Bean 
public IntegrationFlow errorResponse(){ 
    return f -> f.<MessageHandlingException, Object>route(t -> t.getFailedMessage().getHeaders().get("ABCDEF"), 
         mapping -> mapping.subFlowMapping("ABCDEF", 
           sf -> sf.gateway("customError.input", ConsumerEndpointSpec::transactional)), 
           conf -> conf.id("router:error response prepare")); 
} 

// In Class - 3 
    @Bean 
    public IntegrationFlow type1() { 
     IntegrationFlow integrationFlow = f -> f 
       .enrichHeaders(h -> h.header("ABCDEF", "ABCDEF", true)) 
       .<Type1>handle((p, h) -> authentication.authenticate(p), 
         conf -> conf.id("service-activator:Authenticate")) 
       .transform(transformer::transformType1MsgToDataX, 
         conf -> conf.id("transform:Unmarshall type1 Message")) 
       .enrichHeaders(h -> h.headerExpression(TypeDataIntegrationMessageHeaderAccessor.MESSAGE_ID, "payload.id") 
         .headerExpression(TypeDataIntegrationMessageHeaderAccessor.MESSAGE_TYPE, "payload.messageType")) 
       .handle((GenericHandler<DataX>) repository::successResponseMessage, 
         conf -> conf.id("service-activator:return success")) 
       .channel("outgoingResponse.input") 
       ; 

     return integrationFlow; 
    } 

// In Class - 3 
@Bean 
    public IntegrationFlow customError(){ 
     return f -> f.handle((GenericHandler<MessageHandlingException>)eventRepository::failureResponseMessage, 
           conf -> conf.id("service-activator:return failure")); 
    } 

编辑 - 2:

我尝试阿尔乔姆的测试代码,它工作在这种情况下。如果我将type1流转换为子流映射如下(我这样做,因为我怀疑我的子流代码块),错误流不能打印ABCDEF参数值。 之后,我将另一个标题(XYZTWR)添加到子流映射中,但不能打印。

@Bean 
public IntegrationFlow type1() { 
    return f -> f.<String, String>route(t -> t.toString(), mapping -> mapping.subFlowMapping("foo", 
      sf -> sf.gateway("fooFlow.input", ConsumerEndpointSpec::transactional).enrichHeaders(h -> h.header("XYZTRW", "XYZTRW", true)))); 
} 

@Bean 
public IntegrationFlow fooFlow() { 
    return f -> f.enrichHeaders(h -> h.header("ABCDEF", "ABCDEF", true)) 
      .handle((p, h) -> { 
       throw new RuntimeException("intentional"); 
      }); 
} 

我S.OUT是:

GenericMessage [payload=foo, headers={history=testGateway,type1.input, id=1fad7a65-4abe-c41d-0b22-36839a103269, timestamp=1503029553071}] 
+0

我用spring集成4.3.11和spring整合java dsl 1.2.2。 – user2286211

回答

0

errorChannel头开始工作时,我们转移消息发送给不同的线程执行人或队列信道。否则标准throwtry...catch在相同的调用堆栈中工作。

因此,在您的情况下,认证异常只是引发到调用者 - WS入站网关。在这里你已经配置了全局错误通道。

我做了这个测试:

@Configuration 
@EnableIntegration 
@IntegrationComponentScan 
public static class ContextConfiguration { 

    @Bean 
    public IntegrationFlow errorResponse() { 
     return IntegrationFlows.from(errorChannel()) 
        .<MessagingException, Message<?>>transform(MessagingException::getFailedMessage, 
          e -> e.poller(p -> p.fixedDelay(100))) 
        .get(); 
    } 

    @Bean 
    public IntegrationFlow type1() { 
      return f -> f 
        .enrichHeaders(h -> h.header("ABCDEF", "ABCDEF", true)) 
        .handle((p, h) -> { throw new RuntimeException("intentional"); }); 
    } 

    @Bean 
    public PollableChannel errorChannel() { 
     return new QueueChannel(); 
    } 
} 

@MessagingGateway(errorChannel = "errorChannel", defaultRequestChannel = "type1.input") 
public interface TestGateway { 

    Message<?> sendTest(String payload); 

} 

... 

@Autowired 
private TestGateway testGateway; 

@Test 
public void testErrorChannel() { 
    Message<?> message = this.testGateway.sendTest("foo"); 
    System.out.println(message); 
} 

而且我SOUT显示我:

GenericMessage [payload=foo, headers={ABCDEF=ABCDEF, id=ae5d2d44-46b7-912d-17d4-bf2ee656140a, timestamp=1502999446725}] 

请,请为org.springframework.integration类别DEBUG日志记录级别,并观察在该步骤中你的消息是失去希望的头。

UPDATE

确定。我看到你的问题。因为您使用的是sf -> sf.gateway("fooFlow.input", ConsumerEndpointSpec::transactional),换句话说,您通过网关呼叫下游,您所做的所有事情都在门后,并且只有在发送错误的情况下才能返回,这是网关的请求消息。下游failedMessage被默认吞下。

要解决这个问题,您应该考虑为该.gateway()增加一个errorChannel()选项,并处理那里的下游错误。或者...只是不要在路由器的子流程中使用.gateway(),而是简单地使用channel映射。

.transactional()也可以配置在任何.handle()

+0

谢谢。我明白,但我的另一个问题是,我不能传递一个参数(如标题地图值)到全局错误通道来决定我的自定义配置。 – user2286211

+0

您可以改为在“MarshallingWebServiceInboundGateway”级别上使用自定义错误频道。 –

+0

如果我可以将头参数传递给全局错误通道,它可以帮助我,但我不能。有没有办法呢?我需要启动流名称信息来决定全局错误通道的过程。 – user2286211