2017-03-03 78 views
1

我有作为的GroupBy预计该工程的一些测试代码...使用的GroupBy用的毗连只返回第一组值

代码

var sw = Stopwatch.StartNew(); 
int groupSize = 5; 

var coreObservable = Observable 
    .Range(1, 20) 
    .Select((x, idx) => new { x, idx }) 
    .GroupBy(x => x.idx/groupSize, x => x.x) 
    .Select(x => x.ToList()) 
    .Replay() 
    .RefCount(); 

coreObservable.Subscribe(
    x => x.Subscribe(y => Console.WriteLine("Event raised [Books: {0}, Timestamp: {1}]", string.Join("|", y), sw.Elapsed)), 
    () => Console.WriteLine("Subcription closed")); 

coreObservable.Wait(); // blocking until observable completes 

输出

Event raised [Values: 1|2|3|4|5, Timestamp: 00:00:00.3224002] 
Event raised [Values: 6|7|8|9|10, Timestamp: 00:00:00.3268353] 
Event raised [Values: 11|12|13|14|15, Timestamp: 00:00:00.3270101] 
Event raised [Values: 16|17|18|19|20, Timestamp: 00:00:00.3270803] 
Subcription closed 

问题是当我尝试用这个表达式使用Concat时...

代码

var sw = Stopwatch.StartNew(); 
int groupSize = 5; 

var coreObservable = Observable 
    .Range(1, 20) 
    .Select((x, idx) => new { x, idx }) 
    .GroupBy(x => x.idx/groupSize, x => x.x) 
    .Select(x => x.ToList()) 
    .Concat() // JUST ADDED THIS 
    .Replay() 
    .RefCount(); 

coreObservable.Subscribe(
    x => Console.WriteLine("Event raised [Values: {0}, Timestamp: {1}]", string.Join("|", x), sw.Elapsed), 
    () => Console.WriteLine("Subcription closed")); 

coreObservable.Wait(); // blocking until observable completes 

输出

Event raised [Values: 1|2|3|4|5, Timestamp: 00:00:00.2728469] 
Event raised [Values: , Timestamp: 00:00:00.2791311] 
Event raised [Values: , Timestamp: 00:00:00.2793720] 
Event raised [Values: , Timestamp: 00:00:00.2794617] 
Subcription closed 

通知仅在第一组值露出。

我使用GroupBy而不是Buffer的原因是因为我试图用它来创建数据馈送的最大大小块,这些数据馈送以突发形式出现。原始的observable可能是项目的数组,当单个事件中的项目太多时,我想分割这些数组。

我想使用Concat的原因是因为我想能够在阵列事件之间创建延迟,就像很多人推荐here一样。

回答

2

Merge()替换Concat(),它能正常工作。

我相信你的问题的原因是,Concat()将不会开始听下一个序列,直到当前的一个完成。

的毗连图:

s1 --0--1--2-| 
s2   -5--6--7--8--| 
r --0--1--2--5--6--7--8--| 

虽然Merge()订阅了在同一时间的所有子序列,并发布每当任何子出版值的值。

合并图:

s1 --1--1--1--| 
s2 ---2---2---2| 
r --12-1-21--2| 

所以你的情况,该Concat()预订来自Select(x => x.ToList())第一IObservable<IList<int>>,公布值,直到它完成,然后订阅到下一个序列。 GroupBy()将为其找到的每个组创建一个新的IGroupedObservable流,但所有IGroupedObservables将同时完成:当基础流完成时。

因此Concat()侦听第一个流直到它完成,但是当第一个流完成时,所有其他流都完成了(因为它们实际上都是相同的序列,只是按键分割),所以没有值它发布以下序列。

所有的图都是从here借来的,这对于Rx来说是一个很好的资源,我强烈建议你看看有关各种操作员工作方式的问题。

+0

问题是我使用热可观察事实的一部分?所以也许所有这些事件在concat订阅之前就已经发生了。 – KrisG

+0

是的,这是确切的问题。所有的观察对象都是由GroupBy()立即“启动”的,但Concat只能一次一个地监听它们。 –

0

您的问题,可以减少这样的事情,这可能是简单的想起:

var sw = Stopwatch.StartNew(); 
var subject = new Subject<int>(); 

var o2 = subject.Where(i => i % 2 == 0).ToList(); 
var o3 = subject.Where(i => i % 3 == 0).ToList(); 
var o4 = subject.Where(i => i % 4 == 0).ToList(); 

var c = Observable.Concat(o2, o3, o4) 
//  .Replay() 
//  .RefCount() 
//.Replay().RefCount() has no impact here. 
    ; 

c.Subscribe(
    x => Console.WriteLine("Event raised [Values: {0}, Timestamp: {1}]", string.Join("|", x), sw.Elapsed), 
    () => Console.WriteLine("Subcription closed")); 

for(int i = 0; i < 6; i++) 
    subject.OnNext(i); 
subject.OnCompleted(); 

输出:

Event raised [Values: 0|2|4, Timestamp: 00:00:00.0002278] 
Event raised [Values: , Timestamp: 00:00:00.0002850] 
Event raised [Values: , Timestamp: 00:00:00.0003049] 
Subcription closed 

如果你是大理石图这些,它会是这样的:

s :| 
o2 : ------(024)| 
o3 : ------(03) | 
o4 : ------(04) | 

cOut: ------(024)| 
cSub: (So2)------(So3)(So4) 

cSub shows when c subscribes to child observables. cOut shows c's output. 
So2 means subscribe to o2, So3 means subscribe to o3, etc.. 

Concat订阅了传递给它的第一个observable,然后onl当当前一个完成时,y订阅随后的可观察值。在我们的例子中,ToList不会遗漏任何东西,直到源完成,当它转储整个列表时。所以o2,o3,全部同时完成,但c只订阅了o2o2完成后,它会尝试订阅其他人,但他们已经完成。

至于如何解决它,Merge会工作,但我猜你想在组2之前处理组1,其中Merge会中断。