2017-02-24 57 views
1

我有一个事件源生成属于某些组的事件。我想缓冲这些组并将这些组(批量)发送到存储。到目前为止,我有这个:缓冲组按组进行反应扩展,嵌套订阅

eventSource 
    .GroupBy(event => event.GroupingKey) 
    .Select(group => new { group.Key, Events = group }) 
    .Subscribe(group => group.Events 
          .Buffer(TimeSpan.FromSeconds(60), 100) 
          .Subscribe(list => SendToStorage(list))); 

所以有一个嵌套的订阅组中的事件。不知何故,我认为有更好的方法,但我还没有弄清楚。

回答

3

这里的解决方案:

eventSource 
    .GroupBy(e => e.GroupingKey) 
    .SelectMany(group => group.Buffer(TimeSpan.FromSeconds(60), 100)) 
    .Subscribe(list => SendToStorage(list)); 

这里有一对夫妇一般规则,可以帮助你 '减少':

1)嵌套订阅通常在嵌套订阅之前嵌入订阅,然后跟着Merge,然后嵌套订阅。因此应用的是,你会得到这样的:

eventSource 
    .GroupBy(e => e.GroupingKey) 
    .Select(group => new { group.Key, Events = group }) 
    .Select(group => group.Events.Buffer(TimeSpan.FromSeconds(60), 100)) //outer subscription selector 
    .Merge() 
    .Subscribe(list => SendToStorage(list)); 

2)可以很明显的将两个连续的选择(既然你不匿名对象做任何事情,只需删除):

eventSource 
    .GroupBy(e => e.GroupingKey) 
    .Select(group => group.Buffer(TimeSpan.FromSeconds(60), 100)) 
    .Merge() 
    .Subscribe(list => SendToStorage(list)); 

3)最后,Select后跟一个Merge可以减少到一个SelectMany

eventSource 
    .GroupBy(e => e.GroupingKey) 
    .SelectMany(group => group.Buffer(TimeSpan.FromSeconds(60), 100)) 
    .Subscribe(list => SendToStorage(list)); 
+1

一个优秀的,公呈现答案。 – Enigmativity

+0

写了一些单元测试,以验证它继续工作,并像一个魅力。谢谢! –

1

这里是一个办法做到这一点

(from g in eventSource.GroupByUntil(e => e.GroupingKey, 
            g => g.Buffer(TimeSpan.FromSeconds(60), 100)) 
from b in g.ToList() 
select b).Subscribe(SendToStorage);