2017-04-27 146 views
1

我正在查看是否有一种方法可以从Observable同步返回一个缓存值,否则可能需要很长时间才能发出。当然,如果它需要执行它的io /计算,那么它应该在计算线程上执行它,但是如果它已经在之前完成了,那么它应该是同步的并且避免在线程之间来回跳转。下面是我的意思了一些示例代码:RxJava:同步/立即返回一个缓存的值

public void bind(ItemViewHolder holder) { 
    getCalculationObservable() 
      .observeOn(AndroidSchedulers.mainThread()) 
      .subscribe(value -> { 
        holder.showValue(value); // This will happen after bind has finished 
       } 
      }); 
} 

public Observable<Integer> getCalculationObservable() { 
    if (mObservable == null) { 

     mObservable = Observable.fromCallable(this::calculate) 
       .subscribeOn(Schedulers.computation()) 
       .cache(); 

    } 
    return mObservable; 
} 

public int calculate() throws InterruptedException { 
    Thread.sleep(1000); 
    return mValue * 1000; 
} 

编辑:说明什么我谈论:

void onRunSchedulerExampleButtonClicked() throws InterruptedException { 


    Observable<Integer> observable = Observable 
      .fromCallable(this::calculate) 
      .subscribeOn(Schedulers.computation()) 
      .cache(); 

    observable 
      .doOnNext(value -> { 
       Log.e("log", "first onNext()"); 
      }) 
      .test().await(); 

    observable 
      .observeOn(AndroidSchedulers.mainThread()) 
      .doOnNext(value -> { 
       Log.e("log", "second onNext()"); 
      }) 
      .test().await(); 

    Log.e("log", "this is first."); 

} 

结果:

E/log: onClick 
E/log: first onNext() 
E/log: this is first. 
E/log: second onNext() 

为了进一步说明这一点,如果你在第二个onNext链上添加一个await(),你将永远不会完成它,因为它将等待在你阻塞的同一个线程中排队的东西。

+1

使用'BehaviourSubject'作为:订阅客户主题,并在内部使您的可观察推送项目进入该主题。这就是你可以如何执行后台线程和排放(主线程上的缓存/新值)。 – Than

回答

3

更新:

AndroidSchedulers.mainThread()调度应用observeOn,下游事件得到通过postDelayed内部贴到MessageQueue。这就是为什么位于第二个Observable之后的代码在Observable完成之前执行之前(或者如果我们使用test().await(),则冻结)。一种可能的解决方案是使用Subjects作为您的数据源和订户之间的代理。检查这篇文章的更多信息 - Keep Your Main Thread Synchronous

而且有用的文章:


解释为什么cache不切换线程:

您的Observable已同步返回缓存值,因为cache没有订阅每个订阅者的整个上游(因此在您的情况下它不会切换线程)。它做了一次,然后只记得项目的顺序。对于每个新用户,cache只是重播它。


实施例: (写在科特林

//here is the same logic as yours 
private var observable: Observable<Int>? = null 
    get() { 
     if(field==null) 
      field = Observable.fromCallable { 
       System.out.println("callable: execution thread - ${Thread.currentThread().name}") 
       Thread.sleep(1000) 
       [email protected] 1000 
      } 
        .subscribeOn(Schedulers.computation()) 
        .doOnNext  { System.out.println("cached Observable: before cache() - doOnNext execution thread - ${Thread.currentThread().name}") } 
        .doOnComplete { System.out.println("cached Observable: before cache() - doOnComplete execution thread - ${Thread.currentThread().name}") } 
        .cache() 
        .doOnNext  { System.out.println("cached Observable: after cache() - doOnNext execution thread - ${Thread.currentThread().name}") } 
        .doOnComplete { System.out.println("cached Observable: after cache() - doOnComplete execution thread - ${Thread.currentThread().name}") } 

     return field 
    } 

@Test 
fun test() { 
    observable!! 
      .doOnSubscribe { System.out.println("first get: doOnSubscribe execution thread - ${Thread.currentThread().name}") } 
      .doOnNext  { System.out.println("first get: doOnNext execution thread - ${Thread.currentThread().name}") } 
      .doOnComplete { System.out.println("first get: doOnComplete execution thread - ${Thread.currentThread().name}") } 
      .test() 
      .await() 

    System.out.println("---------- first get executed ------------") 

    observable!! 
      .doOnSubscribe { System.out.println("second get: doOnSubscribe execution thread - ${Thread.currentThread().name}") } 
      .doOnNext  { System.out.println("second get: doOnNext execution thread - ${Thread.currentThread().name}") } 
      .doOnComplete { System.out.println("second get: doOnComplete execution thread - ${Thread.currentThread().name}") } 
      .subscribe() 
} 

输出:

first get: doOnSubscribe execution thread - main 
callable: body execution thread - RxComputationThreadPool-1 
cached Observable: before cache() - doOnNext execution thread - RxComputationThreadPool-1 
cached Observable: after cache() - doOnNext execution thread - RxComputationThreadPool-1 
first get: doOnNext execution thread - RxComputationThreadPool-1 
cached Observable: before cache() - doOnComplete execution thread - RxComputationThreadPool-1 
cached Observable: after cache() - doOnComplete execution thread - RxComputationThreadPool-1 
first get: doOnComplete execution thread - RxComputationThreadPool-1 
---------- first get executed ------------ 
second get: doOnSubscribe execution thread - main 
cached Observable: after cache() - doOnNext execution thread - main 
second get: doOnNext execution thread - main 
cached Observable: after cache() - doOnComplete execution thread - main 
second get: doOnComplete execution thread - main 

正如你可以看到,当存在被缓存的值,线程没有按不会被切换。

P.S.我假设你使用RxJava2。

+0

其实,它没有。虽然第二个get会出现在主线程中,但它不会同步返回。相反,它会在循环中排队并稍后执行。这可以通过第二次获得后的日志轻松验证(当然,这里不能使用await/test)。 –

+0

值得注意的是,一个很大的区别是你的observable不会做observesOn,这似乎会强制排队。 observesOn(mainThread)基本上是一个要求,因为不能触及主线程和所有的视图。 –

+0

我忽略了observeOn故意显示当存在缓存值时线程不会切换。我已经编辑了添加大量日志记录调用的答案。还有 - 当有第二个获取 - 计算排程器不涉及。 –