2016-08-23 129 views
2

我希望创建一个自定义消息处理程序来使用流中的检查点。此外,这些检查点将存储在ElasticSearch如何在Spring Integration自定义消息处理程序中自动装入Bean?

我创建了一个类检查点

@Component 
public class Checkpoint { 

    public static final String TASK_HEADER_KEY = "task"; 

    public static CheckpointMessageHandlerSpec warn(String message) { 
     return new CheckpointMessageHandlerSpec(new CheckpointHandler("WARN", message)); 
    } 
} 
// ... methods omitted: error, info etc 

接下来,我创建CheckpointMessageHandlerSpec

public class CheckpointMessageHandlerSpec extends MessageHandlerSpec<CheckpointMessageHandlerSpec, CheckpointHandler> { 

    public CheckpointMessageHandlerSpec(CheckpointHandler checkpointHandler) { 
     this.target = checkpointHandler; 
    } 

    public CheckpointMessageHandlerSpec apply(Message<?> message) { 
     this.target.handleMessage(message); 
     return _this(); 
    } 

    @Override 
    protected CheckpointHandler doGet() { 
     throw new UnsupportedOperationException(); 
    } 
} 

CheckpointHandler,在这个类我想注入的东西,比如服务或资料库来自Spring的数据:

public class CheckpointHandler extends IntegrationObjectSupport implements MessageHandler { 

    private String status; 
    private String message; 

    // I want inject services or repositories here 

    public CheckpointHandler(String status, String message) { 
     this.status = status; 
     this.message = message; 
    } 

    @Override 
    public void handleMessage(Message<?> message) { 
     // Test to watch if I have the bean factory. It is always null 
     this.getBeanFactory(); 

     Expression expression = EXPRESSION_PARSER.parseExpression("'" + this.message + "'"); 

     // Here I intend to persist information of payload/headers with spring-data-elasticSearch repository previously injected 
     Object obj = expression.getValue(message); 
    } 
} 

最后,使用的例子,一个流之内:

@Bean 
public IntegrationFlow checkpointFlow(Checkpoint checkpoint) { 
    return IntegrationFlows.from(Http.inboundChannelAdapter("/checkpointFlow")) 
      .enrichHeaders(Collections.singletonMap(Checkpoint.TASK_HEADER_KEY, taskName)) 
      .handle(new AppendMessageHandler()) 
      .wireTap(c -> c.handle(m -> checkpoint.warn("SOMETHING IS HAPPENING HERE. MY PAYLOAD: ' + payload.toString() + '").apply(m))) 
      .handle(m -> log.info("[LOGGING DEMO] {}" , m.getPayload())) 
      .get(); 
} 

private class AppendMessageHandler implements GenericHandler { 

    @Override 
    public String handle(Object payload, Map headers) { 
     return new StringBuilder().append(testMessage).toString(); 
    } 
} 

我错过?有可能这样做吗?我在这个问题后有这个想法How to create custom component and add it to flow in spring java dsl?

谢谢!

回答

1

Bean可以自动布线,如果它们是,那么就是bean。

让我们再看看你的代码!

c.handle(m -> checkpoint.warn("SOMETHING IS HAPPENING HERE. MY PAYLOAD: ' + payload.toString() + '").apply(m)) 

真正的豆在这里正是Lambda :)。当然,可悲,但不是你的自定义工厂,随后apply()。您的自定义代码将在每个传入消息的目标Lambda中完全调用,但不知道BeanFactory

要解决你的问题,你应该用你的工厂如:

.wireTap(c -> c.handle(checkpoint.warn("SOMETHING IS HAPPENING HERE. MY PAYLOAD: ' + payload.toString() + '"))) 

和框架,将需要大约为豆你注册并CheckpointHandler因此,自动装配照顾。

正如你可能已经猜到,你不需要apply()方法。仅仅因为需要区分汇编阶段,当Java DSL填充bean的树时。初始化和注册阶段,当这个树被框架分析并且bean被注册在应用程序上下文中时。最后,还有一个运行阶段,即当消息从一个通道传送到另一个通道时,虽然所有这些消息处理程序,变换器等都有。