目的:可观察到流
我使用StackExchange Redis的客户端。我的目标是从客户端公开的Pub Sub Subscriber创建一个Observable流,然后可以依次通过Observables支持订阅,其中每个订阅者都有自己的通过LINQ过滤的订阅。 (出版工作按计划进行,这个问题纯粹是围绕订阅事件流上的特定通道。)
背景:
我使用Redis的酒吧子作为事件源CQRS应用程序的一部分。具体用例是将事件发布给多个订阅者,然后更新各种阅读模型,发送电子邮件等。
这些订户都需要过滤它们处理的事件类型,为此我期待使用Rx带有LINQ的.Net(Reactive Extensions)到 在事件流上提供了一个过滤标准,以便有效地处理仅对感兴趣的事件作出反应。使用这种方法可以省去注册处理程序的事件总线实现,并且允许我通过部署1-n向系统添加新投影。每个微服务都有1-n订阅了具有自己的事件流的可观察对象具体过滤器。
我曾尝试:
1)I已经创建了一个类从ObservableBase继承,重写SubscribeCore方法,该方法从观测量接收的订阅请求,将它们存储在ConcurrentDictionary,并且作为各Redis的通知到达从通道中,遍历已注册的Observable订阅者,并调用传递RedisValue的OnNext方法。
2)我创建了一个Subject,它也接受来自Observables的订阅,并调用它们的OnNext方法。再次,受试者的使用似乎被许多人所诟病。
问题:
我试图做的功能(至少在表面上),与各种性能层面的方法,但feel like a hack
,那我不是它的目的的方式使用的Rx。
我看到很多意见,应尽可能使用内置的Observable方法,例如Observable.FromEvent,但似乎不可能使用StackExchange Redis客户端订阅API,至少在我眼中。
我也明白,用于接收流,并转发到多个观察员的优选的方法是使用一个ConnectableObservable,这似乎被设计为非常场景我面对(每个微服务将在内部具有1 -n订阅观察对象)。目前,我无法理解如何将ConnectableObservable连接到来自StackExchange Redis的通知,或者如果它提供的实际好处超过了Observable。
UPDATE:
虽然完成是不是在我的情况的问题(处理是罚款),错误处理为重要;例如隔离在一个用户中检测到的错误,以防止所有订阅终止。
整洁,我喜欢的方法,+1。 (在接受答案之前,我倾向于等待几天到几天,以便让其他人有机会。)如我的问题所述,您是否发现通过Observable使用ConnectableObservable有什么优势;大多数来源将ConnectableObservable描述为针对读取流的场景而设计,然后发布到多个辅助观察者。尽管在我的场景中完成并不是问题(处置很好),但错误处理很重要;隔离它们以防止所有订阅终止。 – dmcquiggin
如果您想与多个观察者共享相同的订阅,那么您将需要使用'IConnectableObservable',这可以通过使用'.Publish()'或调用另一个'.Multicast()'实现来完成。 'WhenMessageReceived'的结果。它不应该改变'WhenMessageReceived'是如何实现的。 – Lukazoid
通过'WhenMessageReceived'返回一个'IConnectableObservable',以及10个并发的订阅到'IConnectableObservable',是的,测试过的,性能很好。好的解决方案 – dmcquiggin