2016-12-16 99 views
1

我是RxJS的新手,还在学习如何使用库。为concatMap文档提供了以下警告:使用RxJS进行内存管理Observable.concatMap?

警告:如果源值到达不休,比他们的 相应的内观测量速度可以完成的,它会导致内存 问题,如内观测量在无界缓冲区聚敛等待 他们将被订阅。

这对我来说是一个问题,因为我有一个内存密集但快速的concatMap提供一个慢concatMap。它的设置是这样的:

let uploadObs = Observable.range(0, blockCount).concatMap(blockIndex => { 
    // This part is fast and memory intensive. I'd like to use 
    // a bounded buffer here or something similar to control 
    // memory utilization 

    let blockReaderObs = ...; 
    // ... read a block from a large file object in blockReaderObs 
    return blockReaderObs; 
}).concatMap((blockData, index) => { 
    // This part involves a POST so is much slower than reading a 
    // file block 
    let objFromBlockData = someTransformation(blockData); 
    return this.http.post(someUrl, objFromBlockData) 
     .map(transformResponse); 
}); 

在RxJS中处理这类问题的正确方法是什么?

回答

2

这是一个典型的生产者 - 消费者问题。 您可以使用背压操作员来限制发送进行处理的元素的数量。请参阅controlled streams

+1

只有RxJS 4存在“受控” – paulpdaniels