2017-09-21 59 views
0

我是RxJava的新手,所以我仍然试图让自己的头靠近它。我有一个Observable代表一串按钮点击,所以它很热。每次点击该按钮时,我都想做一些I/O。如果失败,我想重复并尝试再次执行该I/O,直到成功。这似乎是一个使用retry()repeat()的好机会,但那些只能用于热门的可观察对象,而不是冷的。正确的方式来重复发射事件流的操作

下面是一些伪代码来获得在我想要做的事:

buttonRequests 
    .map(actionEvent -> doIO()) 
    .repeatAboveIfFailedUntilIOSucceeds() 
    .subscribe(...); 

我使用flatMap复制的事件考虑,也比使用skip忽略其余的,如果它成功了,但不会干净地给我一个不确定的尝试次数。

什么是正确的方式来思考这个问题?

+0

更多的例子你想重复一个失败的动作为每个按钮点击,直到成功?如果它正在重试并且新的游戏会发生什么? –

+0

是的,我会重试每个按钮单击操作,直到它成功。我会禁用该按钮,以防止它在继续时被重新点击。 – Vultan

回答

1

请看看测试。在每个事件上,一个新的IO请求将被触发。 Switch-Map就像Flat-Map一样,但是当新的上游事件进入时,它将退订最近的订阅。如果你正在使用并发性,Flat-Map将会开始一个新的。因此,让我们假设你的热点observable激发了一个事件,flatMap开始在另一个线程(subscribeOn)上执行你的IO工作。如果有另一个事件进入,而最后一个事件仍在执行,它将开始执行另一个IO任务。 Switch-Map将取消订阅最后一个,并为当前事件启动一个。让我们看看retry() - 操作符。重试将重新订阅'ioWorkWrapped'提供的观察值,直到observable完成onComplete。这可能是非常危险的,因为想象一下每次尝试都会失败。它会永远旋转。建议使用'exponential-backoff'并在X尝试后提供备份可观察失败。对于“retryWhen”的使用,请看看这本优秀著作: Reactive Programming with RxJava

public class LibraryTest { 
    private AtomicInteger idx; 

    @Before 
    public void setUp() throws Exception { 
     idx = new AtomicInteger(0); 
    } 

    @Test 
    public void name() throws Exception { 
     Observable<String> stringObservable = Observable.just(1) 
       .switchMap(integer -> ioWorkWrapped() 
         .doOnError(throwable -> System.out.println("Something went wrong.")) 
         .retry() 
       ); 

     stringObservable.test() 
       .await() 
       .assertResult("value"); 


    } 

    private Observable<String> ioWorkWrapped() { 
     return Observable.defer(() -> { 
      try { 
       Thread.sleep(500); // IO Work 
       if (idx.getAndIncrement() < 5) { // for testing... 
        return Observable.error(new IllegalStateException("Wurst")); 
       } 
       return Observable.just("value"); 
      } catch (Exception ex) { 
       return Observable.error(ex); 
      } 
     }); 
    } 
} 
+0

谢谢!这正是我正在寻找的。我已经能够将这个想法整合到我的代码中;我也通过它学到了很多东西。 – Vultan

0

您需要使用运营商retryWhen的情况下,你的I/O操作失败,你可以扔掉了在运营商签一个Runnable例外。如果您遇到这种类型的异常,请重试。

在这个例子中,我们将重试4次。但是这种情况可以通过我们收到的可丢弃类型来改变。

int count=0; 

@Test 
public void retryWhenConnectionError() { 
    Subscription subscription = Observable.just(null) 
      .map(connection -> { 
       System.out.println("Trying to open connection"); 
       connection.toString(); 
       return connection; 
      }) 
      .retryWhen(errors -> errors.doOnNext(o -> count++) 
          .flatMap(t -> count > 3 ? Observable.error(t) : 
            Observable.just(null).delay(100, TimeUnit.MILLISECONDS)), 
        Schedulers.newThread()) 
      .subscribe(s -> System.out.println(s)); 
    new TestSubscriber((Observer) subscription).awaitTerminalEvent(500, TimeUnit.MILLISECONDS); 
} 

您可以在这里看到https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/errors/ObservableExceptions.java

+0

谢谢你我发现了另一个更好的符合我需求的回答,但我也花了一些时间来处理这个问题,这有助于提高我的理解。 – Vultan

相关问题