2017-09-28 24 views
1

我正在处理某些正在记录来自队列的数据。将队列处理成Observable非常容易,以便我的代码中可以有多个端点接收队列中的信息。包含确认的观察模式

此外,我可以确定信息按顺序到达。这个位很好地工作,因为Observables确保这一点。 但是,一个棘手的问题是,我不希望观察者在下一件事完成处理之前得到通知。但Observer完成的处理是异步的。

作为一个更具体的例子,可能很简单。想象一下我的队列包含URL。我在我的代码中将这些视为一个Observable。我订阅了一个观察者,他的工作是获取URL并将内容写入磁盘(这是一个人为的例子,所以不要对这些具体问题置疑)。重要的一点是提取和保存是异步的。我的问题是,我不希望观察者在Observable获得“下一个”URL之后才能完成前面的处理。

但Observer接口上的next的调用返回void。所以Observer没有办法让我回到已经完成异步任务的我。

有什么建议吗?我怀疑有可能是某种可以编码的操作符,它会基本上忽略未来的值(将它们排列在内存中?),直到它以某种方式知道Observer已经准备好了。但我希望按照某种既定模式已经存在这样的事情。

回答

1

类似用途的情况下,我跑进前

window.document.onkeydown=(e)=>{ 
 
    return false 
 
} 
 
let count=0; 
 
let asyncTask=(name,time)=>{ 
 
    time=time || 2000 
 
    return Rx.Observable.create(function(obs) { 
 
     setTimeout(function() { 
 
     count++ 
 
     obs.next('task:'+name+count); 
 
      console.log('Task:',count ,' ', time, 'task complete') 
 
     obs.complete(); 
 
     }, time); 
 
    }); 
 
} 
 

 
let subject=new Rx.Subject() 
 
let queueExec$=new Rx.Subject() 
 

 

 
Rx.Observable.fromEvent(btnA, 'click').subscribe(()=>{ 
 
queueExec$.next(asyncTask('A',4000)) 
 
}) 
 

 
Rx.Observable.fromEvent(btnB, 'click').subscribe(()=>{ 
 
queueExec$.next(asyncTask('B',4000)) 
 
}) 
 

 
Rx.Observable.fromEvent(btnC, 'click').subscribe(()=>{ 
 
queueExec$.next(asyncTask('C',4000)) 
 
}) 
 

 
    queueExec$.concatMap(value=>value) 
 
    .subscribe(function(data) { 
 
     console.log('onNext', data); 
 
    }, 
 
    function(error) { 
 
     console.log('onError', error); 
 
    },function(){ 
 
console.log('completed') 
 
});

+0

所以,如果我明白这一点,你有'asyncTask'返回一个Observable,而不是一个值。该Observable发布了一个值,然后关闭。但是,当你运行'concatMap'将它们连接在一起时,效果是你将不会从下一个Observable获得值,直到第一个*完成*(当异步完成时它会这样做)。这是一个有趣的想法。我发现了一些更适合我的案例,但这对于其他用例来说绝对是一个有趣的想法。 –

+0

有了这个模式,你可以在异步队列中抛出任何东西,只要它是可观察的。在某些情况下,它更加灵活 –

1

你描述的如 “反压” 的声音。您可以在RxJS 4文档https://github.com/Reactive-Extensions/RxJS/blob/master/doc/gettingstarted/backpressure.md中阅读它。但是这里提到RxJS 5中不存在的操作符。例如,请查看“受控观察点”,它应该引用您需要的内容。

我想你可以实现与concatMap相同,Subject的实例:

const asyncOperationEnd = new Subject(); 

source.concatMap(val => asyncOperationEnd 
    .mapTo(void 0) 
    .startWith(val) 
    .take(2) // that's `val` and the `void 0` that ends this inner Observable 
) 
    .filter(Boolean) // Always ignore `void 0` 
    .subscribe(val => { 
    // do some async operation... 
    // call `asyncOperationEnd.next()` and let `concatMap` process another value 
    }); 

来回你的描述,它实际上似乎是“观察员”你提的类似主题的作品所以它将使更多的可能感觉要制作一个可以在任何Observable链中使用的自定义Subject类。