2017-06-20 91 views
0

我试图将Spring集成中的异步消息传递网关与RxNetty(异步HTTP)结合起来。基本上我想要的是向调用线程返回observable/CompletableFuture,并在调用线程中使用Observable的zip/map/flatmap来创建一批出站HTTP调用。我只是想看看这是否可能。此外,如果不是使用Rxjava构造,我最好使用聚合器eip来构建简单的工作流程。Spring中的异步消息传递网关与RxNetty的集成

回答

1

由于4.1版的网关可以返回反应器2.0 Promise<?>

@MessagingGateway 
public static interface TestGateway { 

    @Gateway(requestChannel = "promiseChannel") 
    Promise<Integer> multiply(Integer value); 

} 

    ... 

@ServiceActivator(inputChannel = "promiseChannel") 
public Integer multiply(Integer value) { 
     return value * 2; 
} 

    ... 

Streams.defer(Arrays.asList("1", "2", "3", "4", "5")) 
      .get() 
      .map(Integer::parseInt) 
      .mapMany(integer -> testGateway.multiply(integer)) 
      .collect() 
      .consume(integers -> ...) 
      .flush(); 

与已更改为反应堆3.1 Mono5.0版本开始。

我很确定有一些适配器可以将这些类型转换为RxJava的有价值的东西。

CompletableFuture<?>因为4.2版本也支持网关:

CompletableFuture<String> process(String data); 

... 

CompletableFuture result = process("foo") 
    .thenApply(t -> t.toUpperCase()); 

... 

String out = result.get(10, TimeUnit.SECONDS); 

http://docs.spring.io/spring-integration/reference/html/messaging-endpoints-chapter.html#async-gateway

+0

感谢@artem。这有助于 – Harry

相关问题