2017-03-16 108 views
2

工作这是一个非常简单的例子:RxJava不重试与观察

public static void main(String[] args) { 
    Observable.from(ListUtils.asList(1, 2, 3)).retry(3).subscribe(new Observer<Integer>() { 
     @Override 
     public void onCompleted() { 
      System.out.println("onCompleted"); 
     } 

     @Override 
     public void onError(Throwable e) { 
      System.out.println("onError"); 
     } 

     @Override 
     public void onNext(Integer integer) { 
      System.out.println(integer); 
      if (integer == 3) 
       throw new RuntimeException("onNext exception"); 

     } 
    }); 
} 

控制台输出为:1,2,3,onerror的。 但我的预期:1,2,3,1,2,3,1,2,3,onError。

回答

4

不是错误一旦发生从可观察的用户退订,并且如果你使用操作符重试将重试仅如果操作员没有在主要管道,但在flatMap操作

这一个,因为使用重试是flatMap后,将工作

@Test 
public void retryInFlatMap() { 
    Observable.from(Arrays.asList(1, 2, 3, 4)) 
      .flatMap(number -> Observable.just(number) 
        .doOnNext(n -> { 
         if (n == 2) { 
          throw new NullPointerException(); 
         } 
        })) 
        .retry(3) 
      .subscribe(n-> System.out.println("number:"+n)); 
} 

这一个,因为是一个地图后,它不会

int cont=0; 
@Test 
public void retryInMap() { 
    Observable.from(Arrays.asList(1, 2, 3, 4)) 
      .map(number ->{ 
         if (cont == 2) { 
          throw new NullPointerException(); 
         } 
         cont++; 
         return number; 
        }) 
      .retry(3) 
      .subscribe(n-> System.out.println("number:"+n)); 
} 

如果你想看到更多EXA mples看看这里https://github.com/politrons/reactive

0

retry()运营商重新订阅并执行上面的过程时,observable发出错误。 请参阅this

我写了示例代码。

Observable.from(Arrays.asList(1, 2, 3, 4)) 
    .flatMap(new Func1<Integer, Observable<Integer>>() { 
     @Override public Observable<Integer> call(Integer integer) { 
      if(integer == 4) { 
       return Observable.error(new Exception()); 
      } 
      return Observable.just(integer); 
     } 
    }) 
    // When error occurred, re-subscribe and execute above process 
    // If error occurred over 2 times, Observer.onError occurred 
    .retry(2) 
    .subscribe(new Observer<Integer>() { 
     @Override public void onCompleted() { 
      System.out.println("onCompleted"); 
     } 

     @Override public void onError(Throwable e) { 
      System.out.println("onError"); 
     } 

     @Override public void onNext(Integer integer) { 
      System.out.println(integer); 
     } 
    });