2012-02-06 57 views
10

我使用的Rx一些代码,从多个线程调用,做当的Rx OnNext调用:避免重复使用SubscribeOn(Scheduler.TaskPool)

subject.OnNext(value); // where subject is Subject<T> 

我想要的值,以在后台进行处理,所以我的订阅

subscription = subject.ObserveOn(Scheduler.TaskPool).Subscribe(value => 
{ 
    // use value 
}); 

我真的不关心的工作投入TaskPool,并不会阻止当前线程哪个线程处理出来的可观测值,只要。但是,我在OnNext委托中使用'value'并不是线程安全的。此刻,如果很多值都经历了Observable,我会对OnNext处理函数进行重叠调用。

我可以给我的OnNext委托添加一个锁,但这不像Rx的做法。当我有多个线程调用subject.OnNext(value);时,确保我一次只有一个对我的OnNext处理函数的调用的最佳方法是什么?

回答

10

Using Subjects MSDN上

默认情况下,受试者不跨越 线程执行任何同步。 [...]但是,如果您希望 使用调度程序将传出呼叫同步到观察者,则可以使用Synchronize方法执行此操作。

所以,你应该像布兰登在评论中说的那样,将主题和手同步到你的制作者线程中。例如

var syncSubject = Subject.Synchronize(subject); 

// syncSubject.OnNext(value) can be used from multiple threads 

subscription = syncSubject.ObserveOn(TaskPoolScheduler.Default).Subscribe(value => 
{ 
    // use value 
}); 
+2

对'Synchronize'的调用应在调用ObserveOn之前进行,否则您违反了ObserveOn的并发协议。但实际上,如果用例在几个线程中共享一个主题,最好的解决方案是同步主题,而不是同步订阅者到主题:'var syncSubject = Subject.Synchronize(syncSubject);'现在手动'syncSubject'到你的制作者线程,他们可以调用'syncSubject.OnNext()'而不会导致原始主题的订阅者出现问题。 – Brandon 2013-04-16 15:58:41

+0

@Brandon var syncSubject = Subject.Synchronize(syncSubject); ......在分配之前你正在使用一个变量,你能澄清一下吗? – Beachwalker 2016-06-16 11:23:46

+0

@Beachwalker这是一个错字。它应该作为参数传递给Synchronize,如答案中所示 – Brandon 2016-06-16 11:27:29

-3

你可以尝试实现你自己的IObserver,它将在其OnNext方法中应用一个锁。只需一个简单的装饰器:应用锁定,调用内部OnNext,移除锁定。

然后你可以在IObservable上实现一个扩展方法,就像.AsThreadSafe()。

+1

不是最好的解决方案,但不需要从我的角度来看downvoted,因为它是一个有效的解决方案。所以我+1来平衡这一点与这里downvotes。 – Beachwalker 2016-03-04 07:39:12

5

我认为您正在寻找.Synchronize()扩展方法。为了在最近发布的版本中获得性能改进(2011年底),Rx团队放宽了对可观察序列生成器的连续性的假设。然而,你似乎打破了这些假设(并非坏事),但为了让Rx按照用户期望的那样回放,你应该同步序列以确保它再次连续。

1

这里多说一点为什么要用Synchronize(第二段)。 从另一方面来说,如果你在你的代码中主动使用锁定,Synchronize可能会参与死锁,至少我目睹了这种情况。