2012-02-08 135 views
4

我正在从我们的ASP.NET站点删除我们的电子邮件系统,该站点用于立即用系统发送电子邮件,以便在单独的服务中处理请求以减少网站的工作量。我试图围绕一组接口进行设计,以便我可以交换实现(如果需要的话),但最初它将基于消息队列(MSMQ)将请求发送到队列,让服务接收传入的请求然后处理它们。我现在有一个粗略的定义以下接口:如何创建从MSMQ消息队列中读取的IObservable <T>?

// Sends one or more requests to be processed somehow 
public interface IRequestSender 
{ 
    void Send(IEnumerable<Request> requests); 
} 

// Listens for incoming requests and passes them to an observer to do the real work 
public interface IRequestListener : IObservable<Request> 
{ 
    void Start(); 
    void Stop(); 
} 

// Processes a request given to it by a IRequestListener 
public interface IRequestProcessor : IObserver<Request> 
{ 
} 

你会发现,监听器和处理器使用可观察的模式,因为这是我认为似乎适合最好。

我的问题是搞清楚如何编写从MSMQ接收,基本上我怎么创建一个合适的IObservable<T>IRequestListener的实现?

我发现的第一个选择是根据MSDN documentation给出的例子从零开始创建一个IObservable<T>,但这看起来像是很多管道工作要做的事情。

另一种选择是使用Reactive Extensions,因为它似乎被设计成使创建observable更容易。我发现使用的Rx与MSMQ最接近的是这些网页:

但我不知道我怎么可以将这些例子我IRequestListener接口。

任何其他的想法也欢迎,甚至如果他们适合我的基本设计的变化。

回答

5

我最初使用FromAsyncPattern,但后来最终为它写了一个类,因为它更好地处理了超时和中毒消息。一旦开始,队列无论如何都是热点观察对象。您也可以使用Observable.Defer使其更接近Rx而不是启动/停止。

以下是QueueObservable的基本实现。您可以通过致电ListenReceive开始。

Subject<T> Subject = new Subject<T>(); 

protected void ListenReceive() 
{ 
    Queue.BeginReceive(MessageQueue.InfiniteTimeout, null, OnReceive); 
} 

protected void OnReceive(IAsyncResult ar) 
{ 
    Message message = null; 

    try 
    { 
     message = Queue.EndReceive(ar); 
    } 
    catch (TimeoutException ex) 
    { 
     //retry? 
    } 

    if (message != null) 
     Subject.OnNext((T) message.Body); 

    Thread.Yield(); 

    if (!IsDisposed) 
     ListenReceive(); 
}  

public IObservable<T> AsObservable() 
{ 
     return Subject; 
} 
+0

我在做沿着相同的路线,因为这一些尝试看到这个答案之前,使用'主题'内部并帮助跟踪订阅和我的实现是从您的建议不是一个百万英里的路程,谢谢。 – 2012-02-10 11:07:12

+0

@PeterMonks没问题。 – Asti 2012-02-10 13:12:17