2017-04-24 40 views
1

假设我有两个发现特殊文件名(例如,可以是任何东西)的Verticle并将它们发布到事件总线例如,一个是从一个REST API读取名和另一名来自文件系统:终止并从事件总线中获取结果

ScanRestVerticle.java

/** 
* Reads file names through an REST API 
*/ 
public class ScanRestVerticle extends AbstractVerticle { 

    @Override 
    public void start() throws Exception { 
     HttpClientRequest req = vertx.createHttpClient().request(HttpMethod.GET, "BASE_URL", "URL"); 

     req.toObservable() 
      .flatMap(HttpClientResponse::toObservable) 
      .lift(unmarshaller(Model.class)) 
      .subscribe(c -> vertx.eventBus().publish("address", c.specialName())); 
     req.exceptionHandler(Throwable::printStackTrace); 
     req.end(); 
    } 
} 

ScanFsVerticle.java

/** 
* Reads file names from a file 
*/ 
public class ScanFsVerticle.java extends AbstractVerticle { 

    @Override 
    public void start() throws Exception { 
     StringObservable.byLine(vertx.fileSystem() 
      .rxReadFile("myFileNames.txt") 
      .map(Buffer::toString) 
      .toObservable()) 
      .subscribe(c -> vertx.eventBus().publish("address", c), e -> System.err.println(e.getMessage())); 
    } 
} 

一切的伟大工程,在这里,但现在,我有一个将事件总线中的这些名称组合在一起并在STD.OUT上打印它们的Verticle:

PrintVerticle.java

public class PrintVerticle extends AbstractVerticle { 

    @Override 
    public void start() throws Exception { 
     vertx.eventBus() 
      .<JsonObject>consumer("address") 
      .bodyStream() 
      .toObservable() 
      .reduce(new JsonArray(), JsonArray::add) 
      .subscribe(j -> System.out.println(j.toString())); 
} 

的问题是,减少这里从来没有真正完成,因为事件总线正在无限流,因为我认为。

那么我该如何真正完成这个操作并打印两个Verticle发布的名字?

注:我在vert.x和RX真正的新和我可能会丢失一些作品或得到的东西错了,所以请不要”法官:)

在此先感谢。

编辑:我可以打电话scan()而不是reduce()在这里,以获得中间结果,但我怎么会得到scan().last()

+0

什么是您的完成定义?如果你想收集2个来源发布的每2个名字,那么你需要区分2个事件 – yosriz

+0

感谢您的问题!正如你可以在例子中看到的,我想从两个“扫描”垂直收集所有名称,并将它们加入到“打印”垂直中......当然,这是一个简化的例子。 –

回答

1

你假设正确,reduce()运营商依靠onComplete()事件知道停止收集。
scan()是累加器,它为每个新的源流发射累积值,所以扫描会给你中间的结果。
这里的问题是,你的使用EventBus,概念上,EventBus是无限的流,所以不应该调用onComplete()。从技术上讲,当你使用EventBus时,可能会调用它,但我猜你不应该也不想依赖它。

  • 如果你想使用EventBus,你应该以某种方式结束你的PrintVerticle流,所以你必须onComplete()事件。例如,您可以使用EventBus发布一些发布每个观察到的顶点的事件结束。那么你可以压缩它并结束你的流(使用takeUntil()PrintVerticle在这2个事件发射后。
    事情是这样的:

    Observable vertice1EndStream = vertx.eventBus() 
        .<JsonObject>consumer("vertice1_end") 
        .bodyStream() 
        .toObservable(); 
    
    Observable vertice2EndStream = vertx.eventBus() 
        .<JsonObject>consumer("vertice2_end") 
        .bodyStream() 
        .toObservable() 
    
    Observable bothVerticesEndStream = Observable.zip(vertice1EndStream, vertice2EndStream, (vertice1, vertice1) -> new Object()); 
    
    vertx.eventBus() 
        .<JsonObject>consumer("address") 
        .bodyStream() 
        .toObservable() 
        .takeUntil(bothVerticesEndStream) 
        .reduce(new JsonArray(), JsonArray::add) 
        .subscribe(j -> System.out.println(j.toString())); 
    
  • 另一种选择,就是直接使用2流,而不是EventBus。将它们合并在一起,然后使用reduce(),因为那两个流是有限的,你不会有问题。

+0

感谢您的建议...我以为我会用'EventBus'去,但只使用'Observables'实际上更简单,我觉得它有点“纯”。 –