2016-09-13 90 views
1

下面的单元测试永远不会打印“Async 3”,因为测试首先完成。我怎样才能确保它完成?我能想到的最好的方法是在最后使用任意的Task.Delay或WriteAsync()。结果既不理想。确保在进程终止之前完成异步OnNext代码

public async Task TestMethod1() // eg. webjob 
    { 
     TestContext.WriteLine("Starting test..."); 
     var observable = Observable.Create<int>(async ob => 
     { 
      ob.OnNext(1); 
      await Task.Delay(1000); // Fake async REST api call 
      ob.OnNext(2); 
      await Task.Delay(1000); 
      ob.OnNext(3); 
      ob.OnCompleted(); 
     }); 

     observable.Subscribe(i => TestContext.WriteLine($"Sync {i}")); 
     observable.SelectMany(i => WriteAsync(i).ToObservable()).Subscribe(); 

     await observable; 
     TestContext.WriteLine("Complete."); 
    } 

    public async Task WriteAsync(int value) // Fake async DB call 
    { 
     await Task.Delay(1000); 
     TestContext.WriteLine($"Async {value}"); 
    } 

编辑 我意识到提的单元测试很可能misleading.This不是一个测试问题。该代码模拟了在Azure WebJob中运行的进程的实际问题,生产者和消费者都需要调用一些Async IO。问题在于,在消费者真正完成之前,webjob会完成。这是因为我无法弄清楚如何正确地等待消费者的任何事情。也许这只是是不可能的RX ...

+0

你知道你正在创建三个独立的订阅到底层observable吗?这是你的意图吗?还是你想分享三个观察者中单个观察值的值? – Enigmativity

+0

3个订阅? '等待可观察'是否也会导致订阅?在真正的代码中,原始的可观察性很热,所以我可能应该在示例中使它变得很热门...... –

+0

是的,“await observable”确实会导致第三次订阅。您的第二次订阅“observable.SelectMany(i => WriteAsync(i).ToObservable())。Subscribe()'有一个延迟,所以'await observable'在它之前完成。这就是为什么你错过了“Async 3”。 – Enigmativity

回答

1

编辑: 你基本上是在寻找一个阻塞运算符。旧的阻止操作符(如ForEach)已被弃用,以支持异步版本。你要等待的最后一个项目,像这样:

public async Task TestMethod1() 
{ 
    TestContext.WriteLine("Starting test..."); 
    var observable = Observable.Create<int>(async ob => 
    { 
     ob.OnNext(1); 
     await Task.Delay(1000); 
     ob.OnNext(2); 
     await Task.Delay(1000); 
     ob.OnNext(3); 
     ob.OnCompleted(); 
    }); 

    observable.Subscribe(i => TestContext.WriteLine($"Sync {i}")); 
    var selectManyObservable = observable.SelectMany(i => WriteAsync(i).ToObservable()).Publish().RefCount(); 
    selectManyObservable.Subscribe(); 
    await selectManyObservable.LastOrDefaultAsync(); 
    TestContext.WriteLine("Complete."); 
} 

虽然这将解决您的眼前的问题,它看起来像你要保持运行到由于下面的问题(我增加了两个)。 Rx在正确使用时功能非常强大,如果没有使用,就会让人困惑。

老答案:


有两件事情:

  1. 。混合异步/等待和Rx通常会导致越来越两者的缺陷和没有好处。
  2. Rx具有强大的测试功能。你没有使用它。
  3. 副作用,如WriteLine最好在订阅中完全执行,而不是像SelectMany这样的操作员执行。
  4. 你可能想要冷却vs热的可观察物。
  5. 它没有跑完成的原因是因为你的测试跑步者。您的测试运行人员将在TestMethod1结束时终止测试。 Rx认购将以其他方式生活。当我在Linqpad运行你的代码,我得到以下的输出:

    开始测试...
    同步1
    同步2
    异步1
    同步3
    异步2
    完成。
    异步3

...这是我假设你想看到的,除了你可能想在完成异步3


后使用仅Rx,您的代码会是这个样子:

public void TestMethod1() 
{ 
    TestContext.WriteLine("Starting test..."); 
    var observable = Observable.Concat<int>(
     Observable.Return(1), 
     Observable.Empty<int>().Delay(TimeSpan.FromSeconds(1)), 
     Observable.Return(2), 
     Observable.Empty<int>().Delay(TimeSpan.FromSeconds(1)), 
     Observable.Return(3) 
    ); 

    var syncOutput = observable 
     .Select(i => $"Sync {i}"); 
    syncOutput.Subscribe(s => TestContext.WriteLine(s)); 

    var asyncOutput = observable 
     .SelectMany(i => WriteAsync(i, scheduler)); 
    asyncOutput.Subscribe(s => TestContext.WriteLine(s),() => TestContext.WriteLine("Complete.")); 
} 

public IObservable<string> WriteAsync(int value, IScheduler scheduler) 
{ 
    return Observable.Return(value) 
     .Delay(TimeSpan.FromSeconds(1), scheduler) 
     .Select(i => $"Async {value}"); 
} 


public static class TestContext 
{ 
    public static void WriteLine(string s) 
    { 
     Console.WriteLine(s); 
    } 
} 

这仍然没有利用Rx的测试功能。这看起来像这样:

public void TestMethod1() 
{ 
    var scheduler = new TestScheduler(); 
    TestContext.WriteLine("Starting test..."); 
    var observable = Observable.Concat<int>(
     Observable.Return(1), 
     Observable.Empty<int>().Delay(TimeSpan.FromSeconds(1), scheduler), 
     Observable.Return(2), 
     Observable.Empty<int>().Delay(TimeSpan.FromSeconds(1), scheduler), 
     Observable.Return(3) 
    ); 

    var syncOutput = observable 
     .Select(i => $"Sync {i}"); 
    syncOutput.Subscribe(s => TestContext.WriteLine(s)); 

    var asyncOutput = observable 
     .SelectMany(i => WriteAsync(i, scheduler)); 
    asyncOutput.Subscribe(s => TestContext.WriteLine(s),() => TestContext.WriteLine("Complete.")); 

    var asyncExpected = scheduler.CreateColdObservable<string>(
     ReactiveTest.OnNext(1000.Ms(), "Async 1"), 
     ReactiveTest.OnNext(2000.Ms(), "Async 2"), 
     ReactiveTest.OnNext(3000.Ms(), "Async 3"), 
     ReactiveTest.OnCompleted<string>(3000.Ms() + 1) //+1 because you can't have two notifications on same tick 
    ); 

    var syncExpected = scheduler.CreateColdObservable<string>(
     ReactiveTest.OnNext(0000.Ms(), "Sync 1"), 
     ReactiveTest.OnNext(1000.Ms(), "Sync 2"), 
     ReactiveTest.OnNext(2000.Ms(), "Sync 3"), 
     ReactiveTest.OnCompleted<string>(2000.Ms()) //why no +1 here? 
    ); 

    var asyncObserver = scheduler.CreateObserver<string>(); 
    asyncOutput.Subscribe(asyncObserver); 
    var syncObserver = scheduler.CreateObserver<string>(); 
    syncOutput.Subscribe(syncObserver); 
    scheduler.Start(); 
    ReactiveAssert.AreElementsEqual(
     asyncExpected.Messages, 
     asyncObserver.Messages); 

    ReactiveAssert.AreElementsEqual(
     syncExpected.Messages, 
     syncObserver.Messages); 
} 

public static class MyExtensions 
{ 
    public static long Ms(this int ms) 
    { 
     return TimeSpan.FromMilliseconds(ms).Ticks; 
    } 

} 

...所以不像你的任务测试,你不必等待。测试立即执行。您可以将延迟时间提高到几分钟或几小时,TestScheduler将为您嘲笑时间。然后你的测试运动员可能会很高兴。

+0

如果有人能让我满意+1标记,我会非常感激。 – Shlomo

+0

我已经添加了一些更详细的问题,这可能有点模糊 - 这不是一个真正的RX测试问题,虽然这是我想了解更多关于 –

+0

谢谢。编辑答案。 – Shlomo

-1

好了,你可以使用Observable.ForEach阻塞,直到一个IObservable已终止:

observable.ForEach(unusedValue => { }); 

你能TestMethod1正常,非异步方法,和那么用这个替换await observable;

+0

不,它不幸产生相同的输出(ForEach也被弃用)。 –

+0

OP已经阻止了'await observable',但第二个订阅独立运行,并且因为它被延迟了,它会在稍后结束。 – Enigmativity

相关问题