我发出请求并等待响应流上的该请求的响应。在等待时,更新流可能会有更新。无效扩展:直到其他流完成缓冲
我想在响应流完成时进行这些更新。就像这样:
Response (cold) | x y z |
Updates (hot) | 1 2 3 4 |
Result | x y z 123 4 |
我似乎无法得到它的权利。是否有一些聪明的运营商组合可以给我我需要的东西?
我发出请求并等待响应流上的该请求的响应。在等待时,更新流可能会有更新。无效扩展:直到其他流完成缓冲
我想在响应流完成时进行这些更新。就像这样:
Response (cold) | x y z |
Updates (hot) | 1 2 3 4 |
Result | x y z 123 4 |
我似乎无法得到它的权利。是否有一些聪明的运营商组合可以给我我需要的东西?
从问题的图:
缓存所有更新,直到响应完成,然后显示其他更新。 响应立即显示。
updates.TakeUntil(response.LastAsync())
.ToArray()
.SelectMany(x => x)
.Concat(updates)
.Merge(response);
对不起,延迟回复!这似乎是完全按照我需要的方式工作的,经过一番思考,我也明白了:)谢谢! – asgerhallas
这将工作,但事实上,你不能做到这一点作为一个班轮困扰我:
var updatesReplayed = updates.Replay();
updatesReplayed.Connect();
var results = response.Concat(updatesReplayed);
这里的测试演示功能:
// time | 1 2 3 4 5 6 7 8 9
// Response(cold) | x y z |
// Updates(hot) | 1 2 3 4 |
// Result | x y z 123 4 |
var scheduler = new TestScheduler();
var updates = scheduler.CreateHotObservable(
ReactiveTest.OnNext(100.ToMsTicks(), "1"),
ReactiveTest.OnNext(200.ToMsTicks(), "2"),
ReactiveTest.OnNext(400.ToMsTicks(), "3"),
ReactiveTest.OnNext(800.ToMsTicks(), "4"),
ReactiveTest.OnCompleted<string>(900.ToMsTicks())
);
var response = scheduler.CreateColdObservable(
ReactiveTest.OnNext(300.ToMsTicks(), "x"),
ReactiveTest.OnNext(400.ToMsTicks(), "y"),
ReactiveTest.OnNext(500.ToMsTicks(), "z"),
ReactiveTest.OnCompleted<string>(600.ToMsTicks())
);
var expectedResults = scheduler.CreateHotObservable(
ReactiveTest.OnNext(300.ToMsTicks(), "x"),
ReactiveTest.OnNext(400.ToMsTicks(), "y"),
ReactiveTest.OnNext(500.ToMsTicks(), "z"),
ReactiveTest.OnNext(600.ToMsTicks(), "1"),
ReactiveTest.OnNext(600.ToMsTicks(), "2"),
ReactiveTest.OnNext(600.ToMsTicks(), "3"),
ReactiveTest.OnNext(800.ToMsTicks(), "4"),
ReactiveTest.OnCompleted<string>(900.ToMsTicks())
);
var replayed = updates.Replay();
replayed.Connect();
var results = response.Concat(replayed);
var observer = scheduler.CreateObserver<string>();
results.Subscribe(observer);
scheduler.Start();
ReactiveAssert.AreElementsEqual(expectedResults.Messages, observer.Messages);
这太好了,谢谢!我确实尝试了Replay(),但只能与StartWith结合使用,这在回顾过程中没有任何意义。这使得更多的意义:) – asgerhallas
一个问题,虽然...重播流停止缓存新消息,当它订阅?我知道这不是原始问题的一部分,但它是非常重要的,因为查询很长时间。 – asgerhallas
是的。测试结果表明,因为“4”立即弹出。 – Shlomo
只想大声。它确实看起来有点像这样:http://stackoverflow.com/questions/31362031/with-reactive-extensions-rx-is-it-possible-to-add-a-pause-command,但与检查流完成而不是布尔值。一旦解除暂停,永远不会再暂停。所以像hot.StartWithAndBufferUntilComplete(冷)。 – asgerhallas
可能有多个“响应”? – Shlomo
@Shlomo在Reponse流中可能会有多个事件(如上面的x y z),但一旦完成,就不会有更多的响应,如果这就是您的意思? – asgerhallas