2016-09-28 52 views
1

这里是链:observableOne.flatMap(event -> observableTwo).subscribe()以事件后,才从链可观察到的已发出一个

我想借此事件从observableOne的第一次它发出一个,则忽略此Observable所有其他事件直到observableTwo已发出一个值,完成或完成一个错误。值得一提的是,我最终对第二个Observable的事件感兴趣。

上下文相似,有一个Button点击哪个事件被触发,这是ObservableOne。被触发的事件触发ObservableTwo,假设它是一系列网络操作。所以我想在执行网络操作时忽略所有按钮点击。

+0

如果多次单击按钮,通常会忽略先前的网络操作 - 只会在最后一次按钮单击时获取网络操作的结果。为什么选择忽略按钮点击直到当前的网络操作完成? – Enigmativity

+0

你似乎已经结束了这个问题。为什么不在收到点击后禁用按钮,并在处理网络操作结果后重新启用它? – JohnWowUs

+0

这只是更复杂的逻辑的一部分,有一个原因 – Eugene

回答

0

来控制flatMap提出的请求量使用特殊的过载:

observableOne 
    .doOnRequest(System.out::println) 
    .flatMap(event -> observableTwo, 1) 
    .flatMap(event -> observableThree, 1) 
.subscribe() 

如果你的源代码observableOneobservableTwoobservableThree是同步的这不应该是必要的,但对于异步资源本应做的工作。

+0

我更新了描述,因为它不是很清楚。 – Eugene

0

[EDIT2]改变,所以我适应我的答案+实际上给一个正确的答案

我看不出这样做比用状态标志的其他方式的问题:

AtomicBoolean progress = new AtomicBoolean(); 
observableOne 
     .filter(event -> !progress.get()) 
     .flatMap(event -> 
       observableTwo 
         .doOnSubscribe(() -> progress.set(true)) 
         .doOnTerminate(() -> progress.set(false)) 
     ) 
     .subscribe(); 

如果发生错误时,您的订阅将被取消,即使再次单击该按钮,也不会再收到任何事件。

如果这不是你想要的,你可以:在错误回调

private void bindRemoteCalls() { 
    if (mySubscription != null) mySubscription.unsubscribe(); 
    AtomicBoolean progress = new AtomicBoolean(); 
    mySubscription = observableOne 
     .filter(event -> !progress.get()) 
     .flatMap(event -> 
       observableTwo 
         .doOnSubscribe(() -> progress.set(true)) 
         .doOnTerminate(() -> progress.set(false)) 
     ) 
     .flatMap(event -> observableTwo, 1) 
     .subscribe(
      data -> handleResponse(data), 
      error -> { 
       handleError(error); 
       bindRemoteCalls(); 
      } 
     ); 
} 
  • 使用onErrorReturn()(用doOnError()结合实际做一些事情)

    • 重新订阅

      AtomicBoolean progress = new AtomicBoolean(); 
      observableOne 
          .filter(event -> !progress.get()) 
          .flatMap(event -> 
            observableTwo 
              .doOnSubscribe(() -> progress.set(true)) 
              .doOnTerminate(() -> progress.set(false)) 
              .doOnError(error -> handleError(error)) 
              .onErrorReturn(null) 
              .filter(data -> data != null) 
          ) 
          .subscribe(data -> handleResponse(data)); 
      

    如果您需要,请记得使用subscribeOn()/observeOn()以及正确的调度程序。

    请考虑用switchMap()代替flatMap() - >如果再次按下该按钮,先前的呼叫被取消(取消订阅),然后开始新的呼叫。或者用Rx来表示:observableTwo先前的订阅已取消订阅,并且形成了新的订阅。

    如果您在取消订阅时禁用了您的按钮并在终止时启用了它,您可以通过使按钮不可点击来轻松获得相同的结果。

  • +0

    我更新了描述,因为确实很笨拙。 – Eugene

    +0

    对不起,我已经更新了我的答案,但它仍然不正确,额外的数字不工作,因为我期望:-(它只是延迟映射 –

    +0

    现在它是正确的,但需要一个状态标志,这意味着不是100%反应:) –

    相关问题