你在这里只是一个竞赛条件。
如果我们回来翻录的所有代码,只是
var emitter = new Subject<string>();
emitter.OnNext("one");
emitter.OnCompleted();
var subscription = emitter
.Subscribe(
item => Console.WriteLine(item),
error => Console.WriteLine(error),
() => Console.WriteLine("Complete!")
);
Console.WriteLine("DONE.");
Console.ReadLine();
我们得到了相同的结果。 通过使用Subject<T>
,您将不会获得任何缓存行为,但OnCompleted
通知的例外情况除外。
SubscribeOn
运营商将安排在提供的IScheduler
实例上完成任何订阅工作。 在订阅Subject<T>
的情况下,几乎没有工作要做。 这几乎和将回调注册到回调列表一样简单。
安排到NewThreadScheduler
的工作将创建一个新线程,然后创建一个内部事件循环来处理计划工作。 这很快,但确实需要创建一个新线程,一个EventloopScheduler并执行上下文切换到新线程。
在您的示例中,您安排了上的OnNext
和OnCompleted
通知。 然后你SubscribeOn
与NewThreadScheduler
。 接下来,您开始处理TestScheduler
实例的所有计划工作。 这些虚拟处理的计划项目,只是迭代计划的项目,执行委托和推进虚拟时钟。 这非常快。
更具体,下面的代码是类似于你所写的内容
var newThreadScheduler = new NewThreadScheduler();
var callbacks = new List<Action<string>>();
newThreadScheduler.Schedule(()=>callbacks.Add(str=>Console.WriteLine(str)));
foreach (var callback in callbacks)
{
callback("one");
}
Console.WriteLine("Done");
在这里,我们只是有回调操作的列表(打电话给他们的用户或观察员)。 然后,我们在一个新线程上异步计划添加其中一个回调。 然后立即迭代回调并将字符串“one”发送给它们中的每一个。 结果是
Done
的NewThreadScheduler
只是没有获得足够的时间来启动一个新的线程,调度动作,然后执行该操作,之前主线程可以通过收集迭代。
所以有几条指导原则,我认为你没有遵循: 1)避免主题;-) 2)不要混合使用线程和单元测试。我假设TestScheduler
的存在是因为你正在测试这个。但是,您可以使用两个TestScheduler
例如背景和前景实例。
为了提供更多帮助,我将提供建议您仅从测试中删除第二个调度程序的积极指导。 在您的SubscribeOn
运营商中使用TestScheduler
实例。
接下来,我建议用TestScheduler
的可观测序列工厂方法(即CreateColdObservable
)替换主题+调度的使用。 最后,我不知道是否前进到1秒的speicifc时间获得任何东西,而不仅仅是使用Start
方法。 我认为这会减少噪音和使用魔法值1s。
var testScheduler = new TestScheduler();
var source = testScheduler.CreateColdObservable<string>(
ReactiveTest.OnNext(TimeSpan.FromSeconds(0.1).Ticks, "one"),
ReactiveTest.OnCompleted<string>(TimeSpan.FromSeconds(0.2).Ticks));
var subscription = source.SubscribeOn(testScheduler)
.Subscribe(
item => Console.WriteLine(item),
error => Console.WriteLine(error),
() => Console.WriteLine("Complete!")
);
testScheduler.Start();
Console.WriteLine("DONE.");
Console.ReadLine();
现在唯一的问题是,SubscribeOn
调用是相当多余的。
FYI:代码为NewThreadScheduler
- https://github.com/Reactive-Extensions/Rx.NET/blob/master/Rx.NET/Source/System.Reactive.PlatformServices/Reactive/Concurrency/NewThreadScheduler.cs
我什么都不知道的反应,而是看你的代码,你'newThreadScheduler'认购,但所有的工作和日程安排都对'testScheduler'触发。 – TyCobb