2014-08-27 66 views
1

我已经删除的样板去点rxjs zip不是懒惰?

// a.js

// My observables from stream and event 
this.a = Rx.Node.fromStream(this.aStream()); 
this.itemSource = Rx.Observable.fromEvent(ee, 'addItem'); 

// Zip 'em 
this.itemcombo = Rx.Observable.zip(this.a, this.itemSource, function (s1, s2) { 
    return {item: s2, a: s1.toString()}; 
}); 

// Streams the lowercase alphabet 
rb.prototype.aStream = function aStream() { 
var rs = Readable(); 

var c = 97; 
rs._read = function() { 
    rs.push(String.fromCharCode(c++)); 
    console.log('Hit!'); 
    if (c > 'z'.charCodeAt(0)) { 
     rs.push(null); 
    } 
}; 

return rs; 
}; 

// b.js(需要上面导出模块)

rb.enqueue('a'); // The method simply does an ee.emit('addItem', ...) in the module to trigger the itemSource observable 

我期望看到:

{item: 'a', a: 'a'}印在控制台

发生了什么事:

Hit!{item: 'a', a: 'a'}之前印刷24次。这意味着zip已从aStream中取得所有值,将它们缓存起来,然后执行它应该执行的操作。

我该如何获得相同的功能zip优惠但懒惰?我的目标是使用无限流/可见性,并将其与有限(异步)压缩在一起。通过可运行

编辑

查看/编辑:RX Zip test编辑基于答案更新2代码 - >没有输出了。

+0

请加回样板和简化的例子。 – 2014-08-27 12:50:58

+0

http://www.yoda.arachsys.com/csharp/complete.html – 2014-08-27 12:51:23

+0

@DaveSexton请参阅:http://pastebin.com/mnc82KuV和http://pastebin.com/8HxURWYc复制/粘贴/运行版本- 谢谢!我不认为这个例子可以更简化。这是2个流和zip功能。我已经包含了我用作参考的那个流,它可能已被排除,忽略该部分是安全的,但认为它可能是有用的。 – rollingBalls 2014-08-27 13:04:04

回答

1

zip确实很懒。它只订阅ab,并且在产生新值时进行工作。

您的问题是fromStream只要zip订阅了它,它就会同步发送所有值。发生这种情况是因为您的自定义Readable经常说“有更多数据可用!”

使你的Readable异步,你会得到所需的行为。

尝试这样的事情(未经测试)

var rs = Readable(); 
var subscription = null; 
rs._read = function() { 
    if (!subscription) { 
     // produce the values once per second 
     subscription = Rx.Observable 
      .generateWithRelativeTime(
       97, // start value 
       function (c) { return c > 'z'.charCodeAt(0); }, // end condition 
       function (c) { return c + 1; }, // step function 
       function (c) { return String.fromCharCode(c); }, // result selector 
       function() { return 1000; }) // 1000ms between values 
      .subscribe(
       function (s) { 
        rs.push(s); 
        console.log("Hit!"); 
       }, 
       function (error) { rs.push(null); }, 
       function() { rs.push(null); }); 
    } 
}; 
+0

谢谢你的回答。我更新了可运行的小提琴来使用你的代码,它只是输出“完成”,现在没有其他的事情发生。另外,即使我们通过在内部修改这个特定的流来解决这个问题,我的问题是,是否有办法采取任意流并使其不会溢出zip(以便我可以在任何项目中使用该模式)? – rollingBalls 2014-08-27 14:52:01

+0

我接受这个答案。在了解了更多关于Reactive Programming的知识之后,我想我需要重构我的方法。 – rollingBalls 2014-08-30 12:54:36