我有作为的GroupBy预计该工程的一些测试代码...使用的GroupBy用的毗连只返回第一组值
代码
var sw = Stopwatch.StartNew();
int groupSize = 5;
var coreObservable = Observable
.Range(1, 20)
.Select((x, idx) => new { x, idx })
.GroupBy(x => x.idx/groupSize, x => x.x)
.Select(x => x.ToList())
.Replay()
.RefCount();
coreObservable.Subscribe(
x => x.Subscribe(y => Console.WriteLine("Event raised [Books: {0}, Timestamp: {1}]", string.Join("|", y), sw.Elapsed)),
() => Console.WriteLine("Subcription closed"));
coreObservable.Wait(); // blocking until observable completes
输出
Event raised [Values: 1|2|3|4|5, Timestamp: 00:00:00.3224002]
Event raised [Values: 6|7|8|9|10, Timestamp: 00:00:00.3268353]
Event raised [Values: 11|12|13|14|15, Timestamp: 00:00:00.3270101]
Event raised [Values: 16|17|18|19|20, Timestamp: 00:00:00.3270803]
Subcription closed
问题是当我尝试用这个表达式使用Concat时...
代码
var sw = Stopwatch.StartNew();
int groupSize = 5;
var coreObservable = Observable
.Range(1, 20)
.Select((x, idx) => new { x, idx })
.GroupBy(x => x.idx/groupSize, x => x.x)
.Select(x => x.ToList())
.Concat() // JUST ADDED THIS
.Replay()
.RefCount();
coreObservable.Subscribe(
x => Console.WriteLine("Event raised [Values: {0}, Timestamp: {1}]", string.Join("|", x), sw.Elapsed),
() => Console.WriteLine("Subcription closed"));
coreObservable.Wait(); // blocking until observable completes
输出
Event raised [Values: 1|2|3|4|5, Timestamp: 00:00:00.2728469]
Event raised [Values: , Timestamp: 00:00:00.2791311]
Event raised [Values: , Timestamp: 00:00:00.2793720]
Event raised [Values: , Timestamp: 00:00:00.2794617]
Subcription closed
通知仅在第一组值露出。
我使用GroupBy而不是Buffer的原因是因为我试图用它来创建数据馈送的最大大小块,这些数据馈送以突发形式出现。原始的observable可能是项目的数组,当单个事件中的项目太多时,我想分割这些数组。
我想使用Concat的原因是因为我想能够在阵列事件之间创建延迟,就像很多人推荐here一样。
问题是我使用热可观察事实的一部分?所以也许所有这些事件在concat订阅之前就已经发生了。 – KrisG
是的,这是确切的问题。所有的观察对象都是由GroupBy()立即“启动”的,但Concat只能一次一个地监听它们。 –