2017-02-26 191 views
1

我的用例与将RxJava2与Firebase数据库一起使用有关。RxJava2从监听器创建Flowable并在最后删除监听器

我有DatabaseReference,我可以为它注册值监听器。 我可以转换成这样的可流动:

disposable = Flowable.create<DataSnapshot>({ s -> 
      dbRef.addValueEventListener(object : ValueEventListener { 
       override fun onCancelled(p0: DatabaseError) {...} 

       override fun onDataChange(value: DataSnapshot) { 
        s.onNext(value) 
       } 
      }) 
     }, BackpressureStrategy.BUFFER) 
     .subscribe(...) 

我想能够一次性设置时以除去侦听器。 任何想法我可以做到这一点?

我看到,在rxjava 1有this possibility也许,但它不是在rxjava2

回答

2

随着RxJava2提供您需要使用setCancellable()方法,并把你的听众移除代码在那里。
这与使用Observable.fromEmitter()创建Observable时RxJava1的Emitter.setCancellation()非常相似。

也借此说明akarnokd关于取消:
“但需要注意的是,除非创建逻辑放弃调度(通过终止或去异步),取消逻辑可能不会永远执行,由于同池活锁”。 (RxJava 2: always unsubscribe on the .subscribeOn(..) scheduler?