2017-02-21 43 views
0

你好,我试图找出是否有相当于RxJs操作员压缩在xstream,或至少有一种方式来获得相同的行为。如果有人需要澄清差异,下面的大理石图将会显示。在xstream中的RxJs zip运算符等效吗?

zip in rxjs 
    |---1---2---3-----------5-> 
    |-a------b------c---d-----> 
      "zip" 
    |-1a----2b------3c-----5d-> 


whereas 'combineLatest' aka 'combine' in xstream does 

    |---1---2----------4---5-> 
    |----a---b---c---d-------> 
      "combine" 
    |-1a----2a-2b-2c-2d-4d-5d> 

任何帮助表示赞赏,因为我很新的编程与流。先谢谢你!

回答

1

我还需要一个zip运算符用于xstream。所以我从现有的运营商创建了我自己的。它需要任意数量的流进行压缩。

function zip(...streams) { 
    // Wrap the events on each stream with a label 
    // so that we can seperate them into buckets later. 
    const streamsLabeled = streams 
    .map((stream$, idx) => stream$.map(event => ({label: idx + 1, event: event}))); 
    return (event$) => { 
    // Wrap the events on each stream with a label 
    // so that we can seperate them into buckets later. 
    const eventLabeled$ = event$.map(event => ({label: 0, event: event})); 
    const labeledStreams = [eventLabeled$, ...streamsLabeled]; 

    // Create the buckets used to store stream events 
    const buckets = labeledStreams.map((stream, idx) => idx) 
     .reduce((buckets, label) => ({...buckets, [label]: []}), {}); 

    // Initial value for the fold operation 
    const accumulator = {buckets, tuple: []}; 

    // Merge all the streams together and accumulate them 
    return xs.merge(...labeledStreams).fold((acc, event) => { 
     // Buffer the events into seperate buckets 
     acc.buckets[event.label].push(event); 

     // Does the first value of all the buckets have something in it? 
     // If so, then there is a complete tuple. 
     const tupleComplete = Object.keys(acc.buckets) 
     .map(key => acc.buckets[key][0]) 
     .reduce((hadValue, value) => value !== undefined 
      ? true && hadValue 
      : false && hadValue, 
     true); 

     // Save completed tuple and remove it from the buckets 
     if (tupleComplete) { 
     acc.tuple = [...Object.keys(acc.buckets).map(key => acc.buckets[key][0].event)]; 
     Object.keys(acc.buckets).map(key => acc.buckets[key].shift()); 
     } else { 
     // Clear tuple since all columns weren't filled 
     acc.tuple = []; 
     } 

     return {...acc}; 
    }, accumulator) 

    // Only emit when we have a complete tuple 
    .filter(buffer => buffer.tuple.length !== 0) 

    // Just return the complete tuple 
    .map(buffer => buffer.tuple); 
    }; 
} 

这可以与撰写使用。

foo$.compose(zip(bar$)).map(([foo, bar]) => doSomething(foo, bar))