假设我有两个发现特殊文件名(例如,可以是任何东西)的Verticle并将它们发布到事件总线例如,一个是从一个REST API读取名和另一名来自文件系统:终止并从事件总线中获取结果
ScanRestVerticle.java
/**
* Reads file names through an REST API
*/
public class ScanRestVerticle extends AbstractVerticle {
@Override
public void start() throws Exception {
HttpClientRequest req = vertx.createHttpClient().request(HttpMethod.GET, "BASE_URL", "URL");
req.toObservable()
.flatMap(HttpClientResponse::toObservable)
.lift(unmarshaller(Model.class))
.subscribe(c -> vertx.eventBus().publish("address", c.specialName()));
req.exceptionHandler(Throwable::printStackTrace);
req.end();
}
}
ScanFsVerticle.java
/**
* Reads file names from a file
*/
public class ScanFsVerticle.java extends AbstractVerticle {
@Override
public void start() throws Exception {
StringObservable.byLine(vertx.fileSystem()
.rxReadFile("myFileNames.txt")
.map(Buffer::toString)
.toObservable())
.subscribe(c -> vertx.eventBus().publish("address", c), e -> System.err.println(e.getMessage()));
}
}
一切的伟大工程,在这里,但现在,我有一个将事件总线中的这些名称组合在一起并在STD.OUT上打印它们的Verticle:
PrintVerticle.java
public class PrintVerticle extends AbstractVerticle {
@Override
public void start() throws Exception {
vertx.eventBus()
.<JsonObject>consumer("address")
.bodyStream()
.toObservable()
.reduce(new JsonArray(), JsonArray::add)
.subscribe(j -> System.out.println(j.toString()));
}
的问题是,减少这里从来没有真正完成,因为事件总线正在无限流,因为我认为。
那么我该如何真正完成这个操作并打印两个Verticle发布的名字?
注:我在vert.x和RX真正的新和我可能会丢失一些作品或得到的东西错了,所以请不要”法官:)
在此先感谢。
编辑:我可以打电话scan()
而不是reduce()
在这里,以获得中间结果,但我怎么会得到scan().last()
?
什么是您的完成定义?如果你想收集2个来源发布的每2个名字,那么你需要区分2个事件 – yosriz
感谢您的问题!正如你可以在例子中看到的,我想从两个“扫描”垂直收集所有名称,并将它们加入到“打印”垂直中......当然,这是一个简化的例子。 –