2016-12-23 29 views
5

从消费者端取消可能被称为使用takeUntil,但这不一定非常动态。但是,在这种情况下,我希望取消等式中生产者方的Observable,就像您希望取消Promise链内的Promise一样(对于Native工具来说这不太可能)。从生产者端取消可观察者,而不是消费者端

说我有这个Observable从一个方法返回。 (这个队列库是一个简单的持久队列,可以读/写一个文本文件,我们需要锁定读/写,所以没有任何损坏)。

Queue.prototype.readUnique = function() { 

    var ret = null; 
    var lockAcquired = false; 

    return this.obsEnqueue 
     .flatMap(() => acquireLock(this)) 
     .flatMap(() => { 
      lockAcquired = true; 
      return removeOneLine(this) 
     }) 
     .flatMap(val => { 
      ret = val; // this is not very good 
      return releaseLock(this); 
     }) 
     .map(() => { 
      return JSON.parse(ret); 
     }) 
     .catch(e => { 
      if (lockAcquired) { 
       return releaseLock(this); 
      } 
      else { 
       return genericObservable(); 
      } 
     }); 

}; 

我有2个不同的问题 -

  1. 如果我不能获得锁,我怎么能“取消”可观察到的,只是发回一个空的可观测没有个结果?我真的不得不在每个回调函数中执行if/else逻辑来决定当前链是否被取消,如果是,返回一个空的Observable?通过空,我的意思是一个简单的onNext/onComplete触发的Observable,没有任何错误的可能性,并且没有onNext的任何值。从技术上讲,我不认为这是一个空的Observable,所以我正在寻找真正称为的东西,如果它存在的话。

  2. 如果你看一下这个代码特定顺序:

    .flatMap(() => acquireLock(this)) 
    .flatMap(() => { 
        lockAcquired = true; 
        return removeOneLine(this) 
    }) 
    .flatMap(val => { 
        ret = val; 
        return releaseLock(this); 
    }) 
    .map(() => { 
        return JSON.parse(ret); 
    }) 
    

我在做什么是存储参考在方法的顶部沤,然后再引用它了一步以后。我正在寻找的是一种将removeOneLine()的值传递给JSON.parse(),而不必在链外设置一些状态的方法(这简直是不雅观的)。

回答

3

1)这取决于你的方法acquireLock是如何工作的 - 但我假设,如果它不能获取锁,在这种情况下,你可以catch创建你流和后备流设置抛出一个错误空单:

return Rx.Observable.catch(
     removeLine$, 
     Rx.Observable.empty() 
    ); 

2)做足了状态的外部变量,你可以简单地链接一个mapTo

let removeLine$ = acquireLock(this) 
    .flatMap(() => this.obsEnqueue 
     .flatMap(() => removeOneLine(this)) 
     .flatMap(val => releaseLock(this).mapTo(val)) 
     .map(val => JSON.parse(val)) 
     .catch(() => releaseLock(this)) 
    ); 
+0

谢谢@olsn,我遵循第2部分(虽然我不知道地图和mapTo之间的区别),但关于第1部分,我不关注,换句话说,你介意解释吗? –

+0

part1基本上是代码块的最后4行(对不起,答案可能有点糟糕) map和mapTo之间的区别就是'mapTo'直接将参数传递给_map to_,而'map'则需要一个函数,所以'.mapTo(val)'可以写成'.map(()=> val') – olsn

+0

好的,我会按照我的理解编辑它,请随意编辑我的编辑:) –

2

根据您的定义取消,这是为了防止观察值向下游发送值。为了防止推值的观察到的,你可以使用过滤器:

它可以是简单的:

observable.filter(_ => lockAcquired) 

这只会发送通知下游如果lockAcquired是真实的。