2015-12-05 128 views
0

由于我在堆栈中使用Vertx 3.1,因此我正在考虑使用这些工具带来的Future功能,但是在阅读完API后,对我来说这似乎相当有限。我甚至无法找到让未来等待Observable的方法。 这里我的代码Vertx Future不会等待

  public Observable<CommitToOrderCommand> validateProductRestrictions(CommitToOrderCommand cmd) { 
    Future<Observable<CommitToOrderCommand>> future = Future.future(); 
    orderRepository.getOrder(cmd, cmd.orderId) 
        .flatMap(order -> validateOrderProducts(cmd, order)) 
        .subscribe(map -> checkMapValues(map, future, cmd)); 
    Observable<CommitToOrderCommand> result = future.result(); 
    if(errorFound){ 
     throw MAX_QUANTITY_PRODUCT_EXCEED.create("Fail"/*restrictions.getBulkBuyLimit().getDescription())*/); 
    } 
    return result; 
} 

private void checkMapValues(Multimap<String, BigDecimal> totalUnitByRestrictions, Future<Observable<CommitToOrderCommand>> future, 
          CommitToOrderCommand cmd) { 
    for (String restrictionName : totalUnitByRestrictions.keySet()) { 
     Restrictions restrictions = Restrictions.valueOf(restrictionName); 
     if (totalUnitByRestrictions.get(restrictionName) 
            .stream() 
            .reduce(BigDecimal.ZERO, BigDecimal::add) 
            .compareTo(restrictions.getBulkBuyLimit() 
                  .getMaxQuantity()) == 1) { 
      errorFound = true; 
     } 
    } 
    future.complete(Observable.just(cmd)); 
} 

在我的第一个可观察I'm检查结果的onComplete,并完成后,当我做完以后才能解锁操作。 但我期待future.result不会阻止,直到future.complete被调用,因为我期待。相反,只是返回null。

任何想法这里有什么错?

问候。

回答

3

vertx future不会阻塞,而是会在注入结果时调用处理程序(请参阅setHandlerisComplete)。

如果代码的外层需要Observable,则不需要将其包装在Future中,只需返回Observable<T>即可。 Future<Observable<T>>没有什么意义,你在混合两种做异步结果的方法。

请注意,有些方法可以将Observable折叠到未来中,但难度在于Observable可能放出多个项目,而Future只能放置一个项目。您已经通过将您的结果收集到一张地图中来照顾到这一点。

由于这Observable永远只发出一个项目,如果你想有一个Future出来的,你应该subscribe它并调用future.complete(yourMap)onNext方法。还要定义一个onError处理程序,它将调用future.fail