2017-01-23 26 views
1

我发出请求并等待响应流上的该请求的响应。在等待时,更新流可能会有更新。无效扩展:直到其他流完成缓冲

我想在响应流完成时进行这些更新。就像这样:

Response (cold) |  x y z | 
Updates (hot)  | 1 2 3  4 | 

Result   |  x y z 123 4 | 

我似乎无法得到它的权利。是否有一些聪明的运营商组合可以给我我需要的东西?

+0

只想大声。它确实看起来有点像这样:http://stackoverflow.com/questions/31362031/with-reactive-extensions-rx-is-it-possible-to-add-a-pause-command,但与检查流完成而不是布尔值。一旦解除暂停,永远不会再暂停。所以像hot.StartWithAndBufferUntilComplete(冷)。 – asgerhallas

+0

可能有多个“响应”? – Shlomo

+0

@Shlomo在Reponse流中可能会有多个事件(如上面的x y z),但一旦完成,就不会有更多的响应,如果这就是您的意思? – asgerhallas

回答

2

从问题的图:

缓存所有更新,直到响应完成,然后显示其他更新。 响应立即显示。

updates.TakeUntil(response.LastAsync()) 
     .ToArray() 
     .SelectMany(x => x) 
     .Concat(updates) 
     .Merge(response); 
+0

对不起,延迟回复!这似乎是完全按照我需要的方式工作的,经过一番思考,我也明白了:)谢谢! – asgerhallas

1

这将工作,但事实上,你不能做到这一点作为一个班轮困扰我:

var updatesReplayed = updates.Replay(); 
updatesReplayed.Connect(); 

var results = response.Concat(updatesReplayed); 

这里的测试演示功能:

// time    | 1 2 3 4 5 6 7 8 9 
// Response(cold)  |  x y z | 
// Updates(hot)  | 1 2 3  4 | 
// Result    |  x y z 123 4 | 

var scheduler = new TestScheduler(); 
    var updates = scheduler.CreateHotObservable(
     ReactiveTest.OnNext(100.ToMsTicks(), "1"), 
     ReactiveTest.OnNext(200.ToMsTicks(), "2"), 
     ReactiveTest.OnNext(400.ToMsTicks(), "3"), 
     ReactiveTest.OnNext(800.ToMsTicks(), "4"), 
     ReactiveTest.OnCompleted<string>(900.ToMsTicks()) 
    ); 

    var response = scheduler.CreateColdObservable(
     ReactiveTest.OnNext(300.ToMsTicks(), "x"), 
     ReactiveTest.OnNext(400.ToMsTicks(), "y"), 
     ReactiveTest.OnNext(500.ToMsTicks(), "z"), 
     ReactiveTest.OnCompleted<string>(600.ToMsTicks()) 
    ); 

    var expectedResults = scheduler.CreateHotObservable(
     ReactiveTest.OnNext(300.ToMsTicks(), "x"), 
     ReactiveTest.OnNext(400.ToMsTicks(), "y"), 
     ReactiveTest.OnNext(500.ToMsTicks(), "z"), 
     ReactiveTest.OnNext(600.ToMsTicks(), "1"), 
     ReactiveTest.OnNext(600.ToMsTicks(), "2"), 
     ReactiveTest.OnNext(600.ToMsTicks(), "3"), 
     ReactiveTest.OnNext(800.ToMsTicks(), "4"), 
     ReactiveTest.OnCompleted<string>(900.ToMsTicks()) 
    ); 

    var replayed = updates.Replay(); 
    replayed.Connect(); 

    var results = response.Concat(replayed); 
    var observer = scheduler.CreateObserver<string>(); 
    results.Subscribe(observer); 

    scheduler.Start(); 
    ReactiveAssert.AreElementsEqual(expectedResults.Messages, observer.Messages); 
+0

这太好了,谢谢!我确实尝试了Replay(),但只能与StartWith结合使用,这在回顾过程中没有任何意义。这使得更多的意义:) – asgerhallas

+0

一个问题,虽然...重播流停止缓存新消息,当它订阅?我知道这不是原始问题的一部分,但它是非常重要的,因为查询很长时间。 – asgerhallas

+0

是的。测试结果表明,因为“4”立即弹出。 – Shlomo