我使用的Rx一些代码,从多个线程调用,做当的Rx OnNext调用:避免重复使用SubscribeOn(Scheduler.TaskPool)
subject.OnNext(value); // where subject is Subject<T>
我想要的值,以在后台进行处理,所以我的订阅
subscription = subject.ObserveOn(Scheduler.TaskPool).Subscribe(value =>
{
// use value
});
我真的不关心的工作投入TaskPool,并不会阻止当前线程哪个线程处理出来的可观测值,只要。但是,我在OnNext委托中使用'value'并不是线程安全的。此刻,如果很多值都经历了Observable,我会对OnNext处理函数进行重叠调用。
我可以给我的OnNext委托添加一个锁,但这不像Rx的做法。当我有多个线程调用subject.OnNext(value);
时,确保我一次只有一个对我的OnNext处理函数的调用的最佳方法是什么?
对'Synchronize'的调用应在调用ObserveOn之前进行,否则您违反了ObserveOn的并发协议。但实际上,如果用例在几个线程中共享一个主题,最好的解决方案是同步主题,而不是同步订阅者到主题:'var syncSubject = Subject.Synchronize(syncSubject);'现在手动'syncSubject'到你的制作者线程,他们可以调用'syncSubject.OnNext()'而不会导致原始主题的订阅者出现问题。 – Brandon 2013-04-16 15:58:41
@Brandon var syncSubject = Subject.Synchronize(syncSubject); ......在分配之前你正在使用一个变量,你能澄清一下吗? – Beachwalker 2016-06-16 11:23:46
@Beachwalker这是一个错字。它应该作为参数传递给Synchronize,如答案中所示 – Brandon 2016-06-16 11:27:29