2017-10-15 58 views
0

有可观察的,返回的东西阵列/列表:可观察如何批量添加到由rxjs observables返回的数组/列表中?

而且我有一个用例在哪里是这个观察到的下游消费有更多的项目加入到这个名单相当昂贵的事。所以我想减慢对这个列表的添加量,但是没有任何松动。

类似于一个操作符,它使用此可观察值并返回具有相同签名的另一个可观察值,但是无论何时新列表被推入并且其上的项目多于上次,则一次只添加一个或几个。

因此,如果上次推送是一个包含3个项目的列表,并且下一个推送项目是6个项目,并且批量大小为1,那么这一个列表推送将被拆分为10个单独推送列表,大小为:4,5,6

所以添加成批,这样消费者可以更容易地跟上新添加到列表中。或者,在处理阵列/列表中的其他项目时,消费者不必每次都拖延太久,因为添加被分割并分布在可配置的批量大小上。

回答

0

我做了一个addAdditionalOnIdle运算符,您可以使用let运算符将其应用于任何rxjs observable。它需要一个batchSize参数,因此您可以配置批量大小。它也需要一个dontBatchAfterThreshold,它在一定的列表大小后停止列表的批处理,这对我的目的很有用。

该实现在内部使用新的requestIdleCallback函数来计划在浏览器中存在空闲时间时批量推送的附加项目。此功能不可用在IE或Safari,但我发现这个外观极好填充工具吧,这样你可以使用它今天反正:https://github.com/aFarkas/requestIdleCallback :)

见addAdditionalOnIdle的下面的实施和用法示例:

function addAdditionalOnIdle(
 
    batchSize = 1, 
 
    dontBatchAfterThreshold = 22, 
 
) { 
 
    return (source) => { 
 
    let idleCallback; 
 
    let currentPushedItems = []; 
 
    let lastItemsReceived = []; 
 
    return Rx.Observable.create((observer) => { 
 
     let sourceSubscription = source 
 
     .subscribe({ 
 
     complete:() => { 
 
      observer.complete(); 
 
     }, 
 
     error: (error) => { 
 
      observer.error(error); 
 
     }, 
 
     next: (items) => { 
 
      try { 
 
      lastItemsReceived = items; 
 

 
      if (idleCallback) { 
 
       return; 
 
      } 
 

 
      if (items.length > currentPushedItems.length) { 
 
       const idleCbFn =() => { 
 
       if (currentPushedItems.length > lastItemsReceived.length) { 
 
        throw new Error('currentPushedItems should never be larger than lastItemsReceived.'); 
 
       } 
 

 
       const from = currentPushedItems.length; 
 
       const to = from + batchSize; 
 
       const last = lastItemsReceived.length; 
 
       if (from < dontBatchAfterThreshold) { 
 
        for (let i = from ; i < to && i < last ; i++) { 
 
        currentPushedItems[i] = lastItemsReceived[i]; 
 
        } 
 
       } else { 
 
        currentPushedItems = lastItemsReceived; 
 
       } 
 

 
       if (currentPushedItems.length < lastItemsReceived.length) { 
 
        idleCallback = window.requestIdleCallback(() => { 
 
        idleCbFn(); 
 
        }); 
 
       } else { 
 
        idleCallback = undefined; 
 
       } 
 

 
       observer.next(currentPushedItems); 
 
       }; 
 
       idleCallback = window.requestIdleCallback(() => { 
 
       idleCbFn(); 
 
       }); 
 
      } else { 
 
       currentPushedItems = items; 
 
       observer.next(currentPushedItems); 
 
      } 
 
      } catch (error) { 
 
      observer.error(error); 
 
      } 
 
     }, 
 
     }); 
 

 
     return() => { 
 
     sourceSubscription.unsubscribe(); 
 
     sourceSubscription = undefined; 
 
     lastItemsReceived = undefined; 
 
     currentPushedItems = undefined; 
 
     if (idleCallback) { 
 
      window.cancelIdleCallback(idleCallback); 
 
      idleCallback = undefined; 
 
     } 
 
     observer = undefined; 
 
     }; 
 
    }); 
 
    }; 
 
} 
 

 
function sleep(milliseconds) { 
 
    var start = new Date().getTime(); 
 
    for (var i = 0; i < 1e7; i++) { 
 
    if ((new Date().getTime() - start) > milliseconds){ 
 
     break; 
 
    } 
 
    } 
 
} 
 

 
let testSource = Rx.Observable.of(
 
    [1,2,3], 
 
    [1,2,3,4,5,6], 
 
) 
 
.concat(Rx.Observable.never()); 
 

 
testSource 
 
.let(addAdditionalOnIdle(2)) 
 
.subscribe((list) => { 
 
    // Simulate a slow synchronous consumer with a busy loop sleep implementation 
 
\t sleep(1000); 
 
\t document.body.innerHTML += "<p>" + list + "</p>"; 
 
});
<script src="https://unpkg.com/[email protected]/bundles/Rx.min.js"></script>

相关问题