2015-10-18 72 views
2

我需要将分组的observables(形成相关组的笛卡尔积,但与问题无关)进行分组。使用#groupBy创建的RxJS #zip组

运行以下代码时,只有子可观察组实际上在#zip内部发出值 - 为什么?

https://jsbin.com/coqeqaxoci/edit?js,console

var parent = Rx.Observable.from([1,2,3]).publish(); 
var child = parent.map(x => x).publish(); 
var groupedParent = parent.groupBy(x => x); 
var groupedChild = child.groupBy(x => x); 

Rx.Observable.zip([groupedChild, groupedParent]) 
    .map(groups => { 
    groups[0].subscribe(x => console.log('zipped child ' + x)); // -> emitting 
    groups[1].subscribe(x => console.log('zipped parent ' + x)); // -> not emitting 
    }) 
    .subscribe(); 

groupedChild.subscribe(group => { 
    group.subscribe(value => console.log('child ' + value)); // -> emitting 
}); 

groupedParent.subscribe(group => { 
    group.subscribe(value => console.log('parent ' + value)); // -> emitting 
}); 

child.connect(); 
parent.connect(); 

编辑: 如user3743222在应答说明的,通过GROUPBY发射的基团是hot和订阅的父组(组[1])的所述第一值之后发生已经被排出。发生这种情况时#zip会等待groupedChild和groupedParent发出,后者会更快地发出(意味着它的组会在#zip函数运行之前发出值)。

回答

1

我修改您的代码如下:

var countChild = 0, countParent = 0; 
function emits (who) { 
    return function (x) {console.log(who + " emits : " + x);}; 
} 
function checkCount (who) { 
    return function () { 
    if (who === "parent") { 
     countParent++; 
    } 
    else { 
     countChild++; 
    } 
    console.log("Check : Parent groups = " + countParent + ", Child groups = " + countChild); 
    }; 
} 
function check (who, where) { 
    return function (x) { 
    console.log("Check : " + who + " : " + where + " :" + x); 
    }; 
} 
function completed (who) { 
    return function() { console.log(who + " completed!");}; 
} 
function zipped (who) { 
    return function (x) { console.log('zipped ' + who + ' ' + x); }; 
} 
function plus1 (x) { 
    return x + 1; 
} 
function err() { 
    console.log('error'); 
} 

var parent = Rx.Observable.from([1, 2, 3, 4, 5, 6]) 
    .do(emits("parent")) 
    .publish(); 
var child = parent 
    .map(function (x) {return x;}) 
    .do(emits("child")) 
// .publish(); 

var groupedParent = parent 
    .groupBy(function (x) { return x % 2;}, function (x) {return "P" + x;}) 
    .do(checkCount("parent")) 
    .share(); 

var groupedChild = child 
    .groupBy(function (x) { return x % 3;}, function (x) {return "C" + x;}) 
    .do(checkCount("child")) 
    .share(); 

Rx.Observable.zip([groupedChild, groupedParent]) 
// .do(function (x) { console.log("zip args : " + x);}) 
    .subscribe(function (groups) { 
       groups[0] 
        .do(function (x) { console.log("Child group observable emits : " + x);}) 
        .subscribe(zipped('child'), err, completed('Child Group Observable')); 
       groups[1] 
        .do(function (x) { console.log("Parent group observable emits : " + x);}) 
        .subscribe(zipped('parent'), err, completed('Parent Group Observable')); 
       }, err, completed('zip')); 

//child.connect(); 
parent.connect(); 

这里是输出:

"parent emits : 1" 
"child emits : 1" 
"Check : Parent groups = 0, Child groups = 1" 
"Check : Parent groups = 1, Child groups = 1" 
"Parent group observable emits : P1" 
"zipped parent P1" 
"parent emits : 2" 
"child emits : 2" 
"Check : Parent groups = 1, Child groups = 2" 
"Check : Parent groups = 2, Child groups = 2" 
"Parent group observable emits : P2" 
"zipped parent P2" 
"parent emits : 3" 
"child emits : 3" 
"Check : Parent groups = 2, Child groups = 3" 
"Parent group observable emits : P3" 
"zipped parent P3" 
"parent emits : 4" 
"child emits : 4" 
"Child group observable emits : C4" 
"zipped child C4" 
"Parent group observable emits : P4" 
"zipped parent P4" 
"parent emits : 5" 
"child emits : 5" 
"Child group observable emits : C5" 
"zipped child C5" 
"Parent group observable emits : P5" 
"zipped parent P5" 
"parent emits : 6" 
"child emits : 6" 
"Parent group observable emits : P6" 
"zipped parent P6" 
"Child Group Observable completed!" 
"Child Group Observable completed!" 
"Parent Group Observable completed!" 
"Parent Group Observable completed!" 
"zip completed!" 

有两点使这里:

  1. 拉链和团体行为通过与订阅时刻对比

    • GROUPBY创造可观的预期,无论是在父母和孩子

    有了这些值,你可以在日志中检查Child创建三个组,Parent创建两个

    • 邮编将等待在您作为参数传递的每个来源中都有一个值。在你的情况下,这意味着你将订阅孩子和父母分组 - 观察对象,当他们都将被发布。在日志中,只有在匹配"Check : Parent groups = 1, Child groups = 1"上的数字后,才会看到"Parent group observable emits : P1"

    • 然后,您可以订阅两个分组观察对象,并记录从那里出来的内容。这里的问题是,父组按照observable有值传递,但子组'observable'是在之前创建的,并且已经传递了它的值,所以当你在事实之后订阅时,你看不到那个值 - 但你会看到下一个。

    • 因此,[1-3]中的值将生成3个新的孩子,按照观察值分组,您不会看到任何这些孩子,因为您订阅太迟。但你会看到[4-6]中的值。您可以在日志中查看:"zipped child C4"

    • 由于您在创建后立即订阅它们,因此您将在父组中分类查看所有值。

  2. 连接和发布

    • 我没有完全清楚明白的连接和发布,但因为你的孩子有家长为源,你不需要延迟连接它。如果你连接到父母,孩子会自动开始发射它的值。因此我修改你的代码。

    • 这应该回答你的直接问题,但不是你的原始目标的笛卡尔产品。也许你应该把它作为一个问题来形成,并看看人们可以带来什么样的答案。

+0

谢谢!你能否解释为什么孩子observable在订阅#zip内容时已经传递了它的价值?我不明白这是怎么发生的......关于2:在我给出的例子中,发布并不是必须的,我只是把它包含在内,因为在完整的脚本中它是必要的。 –

+0

'groupBy'链接到'groupByUntil':'https:// github.com/Reactive-Extensions/RxJS/blob/master/src/core/linq/observable/groupbyuntil.js'每次有一个新密钥即组),创建一个可观察对象。但是这个可观察的是一个“Rx.Subject”,它也是一个热门来源:它立即发布它的价值(参见L35)。这个观察者被打包并发送给观察者(L41,L49)。然后,该键(组)的值通过主体(立即)发出(L75)。简而言之,如果您在传递过程中没有立即订阅该观察值,您将始终失去第一个值。 – user3743222

+0

如果你不熟悉热和冷的可观察物,这里有两个资源处理此事:https://github.com/Reactive-Extensions/RxJS/blob/master/doc/gettingstarted/creating.md#cold- vs-hot-observables; http://jaredforsyth.com/2015/03/06/visualizing-reactive-streams-hot-and-cold/ – user3743222