2016-09-27 86 views
0

我正面临RxJS问题。选择正确的RxJS构造以确保一个订阅在另一个订阅完成之前完成

我的应用正在设计如下:我有两个不同的客户端:ClientA & ClientB预订了两个不同的观测量:ObservableA & ObservableB

请注意,该应用程序还会变动一个名为aVariable的变量。

这里是流:

  1. ClientA订阅了ObservableA
  2. ClientB订阅ObservableB
  3. ObservableB订阅阅读falseaVariable并完成。
  4. ObservableA订阅集aVariabletrue并完成(晚于ObservableB)。

而什么是真正意图是为ObservableA S“的订阅之前ObservableB完成”,让ClientB将从aVariabletrue ...或者换一种方式,一定程度上保证ObservableB的订阅等待直到其他订阅已完成。

我不确定用什么RxJS结构来实现我想要的(我目前使用普通Observable)。我相信我在这里需要的不仅仅是简单的Observable ...

有人可以帮忙吗?

P.S. 注意aVariable是在NGRX店举行,但我不认为这是有关这个问题...

P.P.S. 以上是我的真实应用程序的简化。

+0

通常,当您想要运行异步任务以便使用'concat'或'concatMap'。看到也许类似的问题http://stackoverflow.com/questions/39566268/angular-2-rxjs-how-return-stream-of-objects-fetched-with-several-subsequent/39578646#39578646和http:// stackoverflow。 com/questions/36713531/how-to-use-exhaustmap-in-reactivex-rxjs-5-in-typescript/39589408#39589408 – martin

+0

Hi Martin。唯一的麻烦是那些是不同的客户。因此不可能使用concat。 – balteo

+0

你可以给你一个代表什么样的代码样本吗?我不清楚:什么是“客户端A”?你是什​​么意思你完成订阅?你的意思是“普通”的“可观察”。 – paulpdaniels

回答

1

我认为你可以在你散发出的值时streamB被预订了中间主题解决您的问题:streamB被订阅了它只有在

const completeStreamA = new Rx.Subject(); 
 

 
const streamA = Rx.Observable.never() 
 
    .takeUntil(completeStreamA); 
 

 
const streamB = Rx.Observable.of('aValueOnStreamB') 
 
    .do(() => completeStreamA.next('complete stream A')); 
 

 
//clientA subscribes immediately 
 
streamA.subscribe(
 
    next => console.log('a->next->'+next), 
 
    err => console.log('a->error->' + err.message), 
 
() => console.log('a->complete') 
 
); 
 

 
setTimeout(() => { 
 
    //simulate later subscription by clientB 
 
    streamB.subscribe(
 
    next => console.log('b->next->'+next), 
 
    err => console.log('b->error->' + err.message), 
 
    () => console.log('b->complete') 
 
); 
 
}, 3 * 1000); 
 
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.3/Rx.js"></script>

会接下来是将完成streamA的completeStreamA主题的值。以上代码的输出:

a->complete 
b->next->aValueOnStreamB 
b->complete 
相关问题