2015-12-03 42 views
2

RxJava:如何做一些处理订阅RxJava:如何做一些处理前订阅

之前,我有一个这样的类:

Code1: 

class Container { 
    boolean success; 
    List<Book> books; 
} 

我想改变代码:

Code2: 

    Observable.just(createContainer()) 
      .subscribeOn(Schedulers.io()) 
      .observeOn(AndroidSchedulers.mainThread()) 
      .subscribe(container -> { 
       if (container.success) { 
        doSomethingWhenSuccess(); 

        for (Book book : container.books) { 
         doSomethingForBook(book); 
        } 
       } else { 
        doSomethingWhenFail(); 
       } 
      }); 

Code3: 

    Observable.just(createContainer()) 
      .subscribeOn(Schedulers.io()) 
      .observeOn(AndroidSchedulers.mainThread()) 
      .subscribe(new Subscriber<Container>() { 
       @Override 
       public void onCompleted() { 

       } 

       @Override 
       public void onError(Throwable e) { 
        doSomethingWhenFail(); 
       } 

       @Override 
       public void onNext(Container container) { 
        if (container.success) { 
         doSomethingWhenSuccess(); 

         Observable.from(container.books) 
           .subscribeOn(Schedulers.io()) 
           .observeOn(AndroidSchedulers.mainThread()) 
           .subscribe(book -> { 
            doSomethingForBook(book); 
           }); 
        } else { 
         doSomethingWhenFail(); 
        } 

       } 
      }); 

是这个正确的方式来做到这一点?

Code4: 

    Observable.just(createContainer()) 
      .subscribeOn(Schedulers.io()) 
      .observeOn(AndroidSchedulers.mainThread()) 
      .filter(container -> {     //step 2 
       if (container.success) { 
        doSomethingWhenSuccess(); 
       } else { 
        doSomethingWhenFail(); 
       } 
       return container.success; 
      }) 
      .flatMap(container -> Observable.from(container.books)) 
      .subscribe(book -> { 
       doSomethingForBook(book); 
      }); 

的问题是:

1.should我用的方法filter()

2.哪个过滤块运行? UI_ThreadIO_Thread

3.is Code3可以简单地代码4?

对不起,我的英语不好。

+1

这是很不清楚你问什么,什么不工作。 –

回答

1

也许,您可以使用doOnNextliftyour custom operator

class ContainerCheck implements Observable.Operator<Container, Container> { 
    @Override 
    public Subscriber<? super Container> call(Subscriber<? super Container> o) { 
     return new Subscriber<Container>() { 
      @Override 
      public void onCompleted() { 
       if (o.isUnsubscribed()) return; 
       o.onCompleted(); 
      } 

      @Override 
      public void onError(Throwable e) { 
       if (o.isUnsubscribed()) return; 
       o.onError(e); 
      } 

      @Override 
      public void onNext(Container container) { 
       if (o.isUnsubscribed()) return; 
       if (container.success) { 
        o.onNext(container); 
       } else { 
        o.onError(new Throwable("your costom exception")); 
       } 
      } 
     }; 
    } 
} 

使用ContainerChecklift

Observable.just(createContainer(true)) 
     .subscribeOn(Schedulers.io()) 
     .lift(new ContainerCheck()) 
     .observeOn(AndroidSchedulers.mainThread()) 
     .doOnNext(container -> doSomethingWhenSuccess()) 
     .map(container -> container.books) 
     .flatMap(Observable::from) 
     .subscribe(book -> { 
      doSomethingForBook(book); 
     }, e -> doSomethingWhenFail());