2017-07-29 59 views
0

IM使用BlockingObservable存在难题。我有一个情况,我需要回调完成,并在完成后,我可以做一个翻新API调用来获取数据。具体而言,我需要首先初始化支付网关sdk,然后在成功完成初始化之后,我将进行翻新调用。以下是我迄今为止:rxjava2 - 如何阻止直到回拨完成,然后进行翻新呼叫?

Observable.fromCallable(new Callable<PaymentStrategy>() { 

        @Override 
        public PaymentStrategy call() throws Exception { 
         return gatewayFactory.getPaymentStrategy("US"); 
        }}).flatMap(new Function<PaymentStrategy, ObservableSource<PaymentStrategy>>() { 
        @Override 
        public ObservableSource<PaymentStrategy> apply(@NonNull final PaymentStrategy paymentStrategy) throws Exception { 
         return Observable.fromCallable(new Callable<PaymentStrategy>() { 
          @Override 
          public PaymentStrategy call() throws Exception { 


     /*here is important. i want it to block until init actually 
    gets a call back. when it does the subscriber will call 
onComplete and the observable should move forward at that point*/ 


           paymentStrategy.init(paymentInitSubscriber); 
           return paymentStrategy; 
          } 
         }); 
        }}).observeOn(AndroidSchedulers.mainThread()) 
         .subscribeOn(AndroidSchedulers.mainThread()) 
         .subscribe(paymentInitSubscriber); 

似乎rxjava2没有toBlocking()调用,但我没有找到一个toBlockingFirst()等,并BlockingObservable类。但我不知道如何完成任务。所以要清楚,当我打电话给paymentStrategy.init()时,我需要observable才会阻止,直到调用订阅者onComplete或onNext。我将订阅者作为参数传递,以便回调知道在完成时调用它。有任何想法吗 ?

回答

0

我发现我最好使用Flowable并提供一个发射器。然后我可以发射像onNext和的onComplete等

final PaymentStrategy paymentStrategy = gatewayFactory.getPaymentStrategy("US"); 

      FlowableOnSubscribe flowableOnSubscribe = new FlowableOnSubscribe() { 
       @Override 
       public void subscribe(FlowableEmitter e) throws Exception { 
        FlowableEmitter initdownFlowableEmitter = e; 
        paymentStrategy.init(e); 
    /* above i pass in the emitter and i can call onNext when the call back i want completes. this pushes the stream forward to the next one below of the retrofit call */ 
       } 
      }; 

      final Flowable flowable = Flowable.create(flowableOnSubscribe, BackpressureStrategy.BUFFER); 
      return flowable.flatMap(new Function() { 
       @Override 
       public Object apply(@NonNull Object o) throws Exception { 
        PaymentApi service = mRetrofit.create(PaymentApi.class); 
        return service.getCards(); 
       } 
      }).toObservable(); 

事件和改型类不要忘记使用可流动,而不是可观察到的。

似乎fromAsync已更名为Flowable.create按照GitHub的笔记here

实际上,1.x的fromEmitter(原fromAsync)已更名为Flowable.create。