2016-11-24 139 views
5

目的:可观察到流

我使用StackExchange Redis的客户端。我的目标是从客户端公开的Pub Sub Subscriber创建一个Observable流,然后可以依次通过Observables支持订阅,其中每个订阅者都有自己的通过LINQ过滤的订阅。 (出版工作按计划进行,这个问题纯粹是围绕订阅事件流上的特定通道。)

背景:

我使用Redis的酒吧子作为事件源CQRS应用程序的一部分。具体用例是将事件发布给多个订阅者,然后更新各种阅读模型,发送电子邮件等。

这些订户都需要过滤它们处理的事件类型,为此我期待使用Rx带有LINQ的.Net(Reactive Extensions)到 在事件流上提供了一个过滤标准,以便有效地处理仅对感兴趣的事件作出反应。使用这种方法可以省去注册处理程序的事件总线实现,并且允许我通过部署1-n向系统添加新投影。每个微服务都有1-n订阅了具有自己的事件流的可观察对象具体过滤器。

我曾尝试:

1)I已经创建了一个类从ObservableBase继承,重写SubscribeCore方法,该方法从观测量接收的订阅请求,将它们存储在ConcurrentDictionary,并且作为各Redis的通知到达从通道中,遍历已注册的Observable订阅者,并调用传递RedisValue的OnNext方法。

2)我创建了一个Subject,它也接受来自Observables的订阅,并调用它们的OnNext方法。再次,受试者的使用似乎被许多人所诟病。

问题:

我试图做的功能(至少在表面上),与各种性能层面的方法,但feel like a hack,那我不是它的目的的方式使用的Rx。

我看到很多意见,应尽可能使用内置的Observable方法,例如Observable.FromEvent,但似乎不可能使用StackExchange Redis客户端订阅API,至少在我眼中。

我也明白,用于接收流,并转发到多个观察员的优选的方法是使用一个ConnectableObservable,这似乎被设计为非常场景我面对(每个微服务将在内部具有1 -n订阅观察对象)。目前,我无法理解如何将ConnectableObservable连接到来自StackExchange Redis的通知,或者如果它提供的实际好处超过了Observable

UPDATE

虽然完成是不是在我的情况的问题(处理是罚款),错误处理重要;例如隔离在一个用户中检测到的错误,以防止所有订阅终止。

回答

8

这里是你可以用它来创建一个ISubscriberIObservable<RedisValue>扩展方法和RedisChannel

public static IObservable<RedisValue> WhenMessageReceived(this ISubscriber subscriber, RedisChannel channel) 
{ 
    return Observable.Create<RedisValue>(async (obs, ct) => 
    { 
     await subscriber.SubscribeAsync(channel, (_, message) => 
     { 
      obs.OnNext(message); 
     }).ConfigureAwait(false); 

     return Disposable.Create(() => subscriber.Unsubscribe(channel)); 
    }); 
} 

由于没有Redis的渠道完成后所产生的IObservable将永远不会完成,但您可能降IDisposable订阅取消订阅Redis频道(这将由许多Rx运营商自动完成)。

用途可能是像这样:

var subscriber = connectionMultiplexer.GetSubscriber(); 

var gotMessage = await subscriber.WhenMessageReceived("my_channel") 
    .AnyAsync(msg => msg == "expected_message") 
    .ToTask() 
    .ConfigureAwait(false); 

或按你的例子​​:

var subscriber = connectionMultiplexer.GetSubscriber(); 

var sendEmailEvents = subscriber.WhenMessageReceived("my_channel") 
    .Select(msg => ParseEventFromMessage(msg)) 
    .Where(evt => evt.Type == EventType.SendEmails); 

await sendEmailEvents.ForEachAsync(evt => 
{ 
    SendEmails(evt); 
}).ConfigureAwait(false); 

其他微服务可以过滤不同。

+0

整洁,我喜欢的方法,+1。 (在接受答案之前,我倾向于等待几天到几天,以便让其他人有机会。)如我的问题所述,您是否发现通过Observable使用ConnectableObservable有什么优势;大多数来源将ConnectableObservable描述为针对读取流的场景而设计,然后发布到多个辅助观察者。尽管在我的场景中完成并不是问题(处置很好),但错误处理很重要;隔离它们以防止所有订阅终止。 – dmcquiggin

+0

如果您想与多个观察者共享相同的订阅,那么您将需要使用'IConnectableObservable',这可以通过使用'.Publish()'或调用另一个'.Multicast()'实现来完成。 'WhenMessageReceived'的结果。它不应该改变'WhenMessageReceived'是如何实现的。 – Lukazoid

+0

通过'WhenMessageReceived'返回一个'IConnectableObservable',以及10个并发的订阅到'IConnectableObservable',是的,测试过的,性能很好。好的解决方案 – dmcquiggin