2016-12-16 113 views
3

我正在教我自己编写反应式编程,通过随机问题来解决问题,并且毫不羞耻地询问愚蠢的新手问题。在弄清楚线程调度如何工作的同时,我设法让自己陷入困境。虽然我很确定这段代码没有逻辑意义,但我也不明白发生了什么。指出这可能会帮助我。下面的代码:SubscribeOn()在这里做什么?

var testScheduler = new TestScheduler(); 
var newThreadScheduler = new NewThreadScheduler(); 

var emitter = new Subject<string>(); 
testScheduler.Schedule(TimeSpan.FromSeconds(0.1),() => emitter.OnNext("one")); 
testScheduler.Schedule(TimeSpan.FromSeconds(0.2),() => emitter.OnCompleted()); 

var subscription = emitter.SubscribeOn(newThreadScheduler) 
          .Subscribe(
           item => Console.WriteLine(item), 
           error => Console.WriteLine(error), 
           () => Console.WriteLine("Complete!") 
          ); 

testScheduler.AdvanceBy(TimeSpan.FromSeconds(1).Ticks); 

Console.WriteLine("DONE."); 
Console.ReadLine(); 

预期是什么,也许:

one  
DONE. 
Complete! 

可能出现的交织,因为我不太清楚SubscribeOn()会做。我得到的是:

DONE. 
Complete! 

究竟发生了什么?为什么该项目在完成之前没有生成?在这种情况下ObserveOn()的工作方式与我的预期一样,我理解为什么:它在其他某个线程上运行代理,并且它们可以与“完成”交错。那么SubscribeOn()究竟做了什么?

+0

我什么都不知道的反应,而是看你的代码,你'newThreadScheduler'认购,但所有的工作和日程安排都对'testScheduler'触发。 – TyCobb

回答

2

你在这里只是一个竞赛条件。

如果我们回来翻录的所有代码,只是

var emitter = new Subject<string>(); 
emitter.OnNext("one"); 
emitter.OnCompleted(); 

var subscription = emitter 
          .Subscribe(
           item => Console.WriteLine(item), 
           error => Console.WriteLine(error), 
           () => Console.WriteLine("Complete!") 
          ); 



Console.WriteLine("DONE."); 
Console.ReadLine(); 

我们得到了相同的结果。 通过使用Subject<T>,您将不会获得任何缓存行为,但OnCompleted通知的例外情况除外。

SubscribeOn运营商将安排在提供的IScheduler实例上完成任何订阅工作。 在订阅Subject<T>的情况下,几乎没有工作要做。 这几乎和将回调注册到回调列表一样简单。

安排到NewThreadScheduler的工作将创建一个新线程,然后创建一个内部事件循环来处理计划工作。 这很快,但确实需要创建一个新线程,一个EventloopScheduler并执行上下文切换到新线程。

在您的示例中,您安排了上的OnNextOnCompleted通知。 然后你SubscribeOnNewThreadScheduler。 接下来,您开始处理TestScheduler实例的所有计划工作。 这些虚拟处理的计划项目,只是迭代计划的项目,执行委托和推进虚拟时钟。 这非常快。

更具体,下面的代码是类似于你所写的内容

var newThreadScheduler = new NewThreadScheduler(); 

var callbacks = new List<Action<string>>(); 
newThreadScheduler.Schedule(()=>callbacks.Add(str=>Console.WriteLine(str))); 

foreach (var callback in callbacks) 
{ 
    callback("one"); 
} 

Console.WriteLine("Done"); 

在这里,我们只是有回调操作的列表(打电话给他们的用户或观察员)。 然后,我们在一个新线程上异步计划添加其中一个回调。 然后立即迭代回调并将字符串“one”发送给它们中的每一个。 结果是

Done 

NewThreadScheduler只是没有获得足够的时间来启动一个新的线程,调度动作,然后执行该操作,之前主线程可以通过收集迭代。

所以有几条指导原则,我认为你没有遵循: 1)避免主题;-) 2)不要混合使用线程和单元测试。我假设TestScheduler的存在是因为你正在测试这个。但是,您可以使用两个TestScheduler例如背景和前景实例。

为了提供更多帮助,我将提供建议您仅从测试中删除第二个调度程序的积极指导。 在您的SubscribeOn运营商中使用TestScheduler实例。

接下来,我建议用TestScheduler的可观测序列工厂方法(即CreateColdObservable)替换主题+调度的使用。 最后,我不知道是否前进到1秒的speicifc时间获得任何东西,而不仅仅是使用Start方法。 我认为这会减少噪音和使用魔法值1s。

var testScheduler = new TestScheduler(); 

var source = testScheduler.CreateColdObservable<string>(
    ReactiveTest.OnNext(TimeSpan.FromSeconds(0.1).Ticks, "one"), 
    ReactiveTest.OnCompleted<string>(TimeSpan.FromSeconds(0.2).Ticks)); 

var subscription = source.SubscribeOn(testScheduler) 
          .Subscribe(
           item => Console.WriteLine(item), 
           error => Console.WriteLine(error), 
           () => Console.WriteLine("Complete!") 
          ); 

testScheduler.Start(); 

Console.WriteLine("DONE."); 
Console.ReadLine(); 

现在唯一的问题是,SubscribeOn调用是相当多余的。

FYI:代码为NewThreadScheduler - https://github.com/Reactive-Extensions/Rx.NET/blob/master/Rx.NET/Source/System.Reactive.PlatformServices/Reactive/Concurrency/NewThreadScheduler.cs

+0

感谢这个非常详细的答案,这不是真正的生产代码,只是试图弄清楚事情是如何工作的,所以这使得它有点奇怪!这听起来像将TestScheduler与'真正'的调度程序混合会导致更多问题,所以我需要调整我的实验方式。 – OwenP

+0

是同意的。要么使用testscheduler/historicalscheduler“虚拟化”时间,要么使用真正的时间 –