2017-10-09 97 views
2

我想答案是forkJoin/Promises.all ...但它多一点,请与我裸...异步操作的可观测(主题)时,中间有没有重叠的异步操作是完成

所以......我有一个承诺的来源,可以随机顺序到达,我需要一些方式来说“当所有承诺到期为止,完成后让我知道”。

在一个基于Promise的解决方案中,我最初考虑使用Promise.all,但承诺可能仍然“到达”,而其他人尚未完成。有趣的是,对于“迭代Promise.all”有一个整洁的解决方法,在https://stackoverflow.com/a/37819138/239168

我试图做到这一点的方式。稍后阅读文档,我认为forkJoinPromise.all等效,但是,同样的问题,我没有时间可以安全地呼叫forkJoinPromise.all,因为总是可以再添加一个,而另一个仍处于待定状态。因为我现在可能没有意义,所以我想我会要求一些指导。

设置

(握住你的笑,如果这是愚蠢的,我是新来的Rx ...)

我有一个主题,我想,当知道所有的承诺,在这是完整的......还总是可以在任何时候得到新的补充承诺......

private promiseSource = new Subject<Promise<any>>(); 
promises$ = this.promiseSource.asObservable(); 

每当一个新的承诺“到达”,我只是将它添加到受

this.promiseSource.next(somePromise); 

我想要奇迹般地发生的事情是 - 只要它拥有完整的承诺,就让主题“完成”。

例如

promises$.magicFlatMapForkJoinConcatMapTrickery().subscribe({ 
    next: x => ..., 
    error: err => ..., 
    complete:() => { 
    console.log('all promises we got so far are done'); 
    // nice to have, I want this to keep "listening" for new promises 
    promiseSource.youAreNotREALYCompletePleaseReset(); 
    } 
}); 

或者换句话说,我有一个观察的异步操作的,如果我们看一看的内容,我们可以看到重叠的异步操作,我想知道什么时候有没有重叠例如

|<-async action 1->| |<-async action 3->| 
      |<-async action 2->|      |<-async action 4->| 

              /\  /\ 
             find this gap 

如果这些是例如http调用,我基本问 - 告诉我什么时候没有打开http调用。

TL;博士

如何落实这一承诺在RxJS世界基于答案...

https://stackoverflow.com/a/37819138/239168

回答

1

我能想到的做着这基于一个相当简单的方法先前的回答。您可以使用fromPromise将您的Subject<Promise<any>>变成Subject<Observable<any>>,然后您可以使用this答案中描述的active函数将其降至可观察的活动可观察值。一旦你有了这些,你可以将你的查询短语定义为“当活动流数组变为空时”,这可以用一个简单的过滤器来完成,例如,:

active(yourSubjectOfObservables).filter(x => x.length === 0).subscribe(() => { 
    // here we are, all complete 
}); 

这将每次触发活动数据流转换的次数为零,所以如果你只是想在第一时间,将一个.take(1)或。 first之间的过滤器和订阅。

可能不是最漂亮的解决方案,但它在概念上很简单。

+0

酷,会试试看......我还在攀登悬崖:) –

2

如果我正确解释你的问题,你只对表示是否有未决承诺的信号感兴趣。

使用mergescan来创建一个观测值可以很容易地发出未决承诺的计数,因此,您应该可以创建任何您喜欢的信号。

基本上,每次受试者发出承诺时,应该增加未决承诺的次数。每当这些承诺中的一个解决时,计数可以递减。

const promises = new Rx.Subject(); 
 

 
const pendingCount = Rx.Observable 
 
    .merge(
 
    promises.mapTo(1), 
 
    promises.mergeMap(p => Rx.Observable.from(p).mapTo(-1)) 
 
) 
 
    .scan((acc, value) => acc + value, 0) 
 
    .do(count => console.log(`${count} pending promise(s)`)); 
 

 
const doneSignal = pendingCount 
 
    .filter(count => count === 0) 
 
    .mapTo("done"); 
 

 
doneSignal.subscribe(signal => console.log(signal)); 
 

 
const timeoutPromise = (delay) => new Promise(resolve => setTimeout(resolve, delay)); 
 

 
promises.next(timeoutPromise(200)); 
 
setTimeout(() => promises.next(timeoutPromise(200)), 100); 
 
setTimeout(() => promises.next(timeoutPromise(200)), 300); 
 
setTimeout(() => promises.next(timeoutPromise(200)), 700);
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/[email protected]/bundles/Rx.min.js"></script>