2016-02-12 47 views
1

我在我的应用程序中有2个服务。一种youTube服务,用于通过网络加载您的管理评论线索和评论以及管理评论批量加载的评论服务。评论服务特定于我的应用程序,youTube服务与应用程序无关。如何使用一个RXJs流为2个不同的东西

我有一个函数getCommentThreadsForChannel加载注释线程。这个主要的实现是在youTube服务中,但是在评论服务上调用这个服务,基本上只是调用从youTube服务返回observable。

只要我的控制器调用这个,这只是一个可观察的注释序列。但是,在我的commentService我想要在本地存储这些线程。我想批量存储所有线程,只要我有100多个线程,这样我就不会为每一个新的数据位处理列表。我想出了这个代码:

getCommentThreadsForChannel(): Rx.Observable<ICommentThread> { 
     var threadStream: Rx.Observable<ICommentThread> = 
      this.youTubeService.getCommentThreadsForChannel(); 

     threadStream 
      .bufferWithCount(100) 
      .scan((allItems, currentItem) => { 
       currentItem.forEach(thread => { 
        allItems.push(thread); 
       }); 

       console.log(`Save items to local storage: ${allItems.length}`) 

       return allItems; 
      }, [] ); 

     return threadStream; 
    } 

我认为这里配料线程和积累的所有线程进入一个阵列的逻辑是不错,但这些代码不会被调用。我认为这是因为我没有订阅这个线程。

我不想在这里订阅,因为它会订阅底层的流,然后我会有2订阅,所有的数据将加载两次(有很多数据 - 它需要大约一分钟的时间来加载它全部和超过30个100个线程的调用)。

我基本上想要一个do这里不会影响传递给控制器​​的流,但我想使用RXjs的缓冲和累积逻辑。

我相信,我需要共享或发布以某种方式流,但我有使用这些运营商之前,不能看我怎么会是能够做到这一点,不添加第二订阅收效甚微。

如何共享一个流并以两种不同方式使用它,而无需订阅两次?我可以创建某种被动的流,只有当它基于订阅的可观察性时才订阅吗?

回答

1

我最终想到了这一点(有一些来自@MonkeyMagiic的帮助)。

我不得不共用流,这样我可以在每个值做2件不同的事情的数据,缓冲区(IT)和个别。问题是这两个流都必须订阅,但我不想在服务中订阅 - 这应该在控制器中完成。

的解决方案是将2个流再次合并,并从缓冲忽略值:

var intervalStream = Rx.Observable.interval(250) 
    .take(8) 
    .do(function(value){console.log("source: " + value);}) 
    .shareReplay(1); 

var bufferStream = intervalStream.bufferWithCount(3) 
    .do(function(values){ 
     console.log("do something with buffered values: " + values); 
    }) 
    .flatMap(function(values){ return Rx.Observable.empty(); }); 

var mergeStream = intervalStream.merge(bufferStream); 

mergeStream.subscribe(
    function(value){ console.log("value received by controller: " + value); }, 
    function(error){ console.log("error: " + error); }, 
    function(){ console.log("onComplete"); } 
); 

输出:

"source: 0" 
"value received by controller: 0" 
"source: 1" 
"value received by controller: 1" 
"source: 2" 
"value received by controller: 2" 
"do something with buffered values: 0,1,2" 
"source: 3" 
"value received by controller: 3" 
"source: 4" 
"value received by controller: 4" 
"source: 5" 
"value received by controller: 5" 
"do something with buffered values: 3,4,5" 
"source: 6" 
"value received by controller: 6" 
"source: 7" 
"value received by controller: 7" 
"do something with buffered values: 6,7" 
"onComplete" 

JSBin

0

我相信你要找的是sharereplay的组合,谢天谢地RX有shareReplay(bufferSize)

var threadStream: Rx.Observable<ICommentThread> = this.youTubeService 
       .getCommentThreadsForChannel() 
       .shareReplay(1); 

getCommentThreadsForChannel(): Rx.Observable<ICommentThread> { 
     threadStream 
      .bufferWithCount(100) 
      .scan((allItems, currentItem) => { 
       currentItem.forEach(thread => { 
        allItems.push(thread); 
       }); 

       console.log(`Save items to local storage: ${allItems.length}`) 

       return allItems; 
      }, [] ); 

     return threadStream; 
    } 

使用组播运营商的份额将确保第一后,每个追加认购不作不必要的请求和重播,以确保所有新订阅收到的最后通知。

+0

哈,只注意到我们为同两个工作公司! – MonkeyMagiic

+0

值得注意的是,现在'shareReplay'已经从最新的RxJS 5测试版中移除。 [更多信息](http://stackoverflow.com/questions/35246873/sharereplay-in-rxjs-5)。 –

+0

感谢您的回复,但是我不敢担心。我认为问题在于线程流正在从该函数返回时订阅,但bufferWithCount和扫描流未订阅。正如我之前提到的,我不想在这里订阅,我只希望这在线程流订阅时工作。 – Roaders

相关问题