2014-02-18 1437 views
6

我们有一个windows服务,它监听单个RabbitMQ队列并处理消息。基于多线程的RabbitMQ消费者

我们想扩展相同的windows服务,以便它可以监听RabbitMQ的多个队列并处理消息。

不确定这是否可以通过使用多线程,因为每个线程将不得不列出(阻塞)队列。

由于我对多线程技术很陌生,因此需要关于以下几点的高级指南,这将有助于我开始构建原型。

  1. 是否有可能使用线程监听单个应用程序中的多个队列?
  2. 如何处理如果有任何单线程呼叫 播种(由于异常等),如何重新启动整个Windows服务而不需要重新启动 。
  3. 任何设计模式或开放源码实现,可以帮助我处理这种情况。
+0

可能是一个副本:[如何实现rabbitMQ的单消费者多队列模型](http://stackoverflow.com/q/11357512/1768303) – Noseratio

+0

@Noseratio - no我不是问单消费者多队列。将有多个队列与多个队列,但实施应完成单个窗口服务。所以,而不是为每个队列消费者编写多个Windows服务,我想编写单个的Windows服务,它将监听多个队列并处理消息。 – Mahesh

+0

我同意,这似乎并不重复。 – theMayer

回答

6

我喜欢你如何编写你的问题 - 它开始非常广泛,专注于细节。我已经成功实现了一些非常相似的东西,目前我正在开发一个开源项目,以吸取我的经验教训并将它们还给社区。不幸的是,虽然 - 我还没有整齐地打包代码,这对你无能为力!总之,要回答你的问题:

1. Is it possible to use threading for multiple queues.

答:是的,但也可以是充满陷阱的。也就是说,RabbitMQ .NET库不是那里最好的代码,我发现它是AMQP协议的一个相对繁琐的实现。最有害的警告之一是它如何处理“接收”或“消耗”行为,如果不小心可能会很容易造成死锁。幸运的是,它在API文档中有很好的说明。 建议 - 如果可以的话,使用单身连接对象。然后,在每个线程中,使用该连接创建一个新的IModel和相应的消费者。

2. How to gracefully handle exceptions in threads - 我相信这是另一个话题,我不会在这里解决它,因为有几种方法可以使用。

3. Any open-source projects? - 我喜欢EasyNetQ背后的想法,尽管我最终无论如何都摇身一变。我希望记得在我的开源项目完成时跟着回来,因为我相信它比EasyNetQ有更好的改进。

+0

你有没有碰巧打包你的代码整齐? – Ommit

+0

好问题。答案很简单。我还没有时间发布任何内容,因为我仍在测试。 – theMayer

5

您可能会感兴趣this answer非常有帮助。我对RabbitMQ的工作原理有一个非常基本的理解,但我可能会继续每个线程每个线程一个用户,就像那里建议的那样。

肯定有不止一个选项为此组织线程模型。实际实现将取决于您需要如何处理来自多个队列的消息:并行或汇总它们并序列化处理。以下代码是一个控制台应用程序,它实现了对后一种情况的模拟。它使用Task Parallel LibraryBlockingCollection类(这对于这类任务非常方便)。

using System; 
using System.Collections.Concurrent; 
using System.Collections.Generic; 
using System.Linq; 
using System.Threading; 
using System.Threading.Tasks; 

namespace Console_21842880 
{ 
    class Program 
    { 
     BlockingCollection<object> _commonQueue; 

     // process an individual queue 
     void ProcessQueue(int id, BlockingCollection<object> queue, CancellationToken token) 
     { 
      while (true) 
      { 
       // observe cancellation 
       token.ThrowIfCancellationRequested(); 
       // get a message, this blocks and waits 
       var message = queue.Take(token); 

       // process this message 
       // just place it to the common queue 
       var wrapperMessage = "queue " + id + ", message: " + message; 
       _commonQueue.Add(wrapperMessage); 
      } 
     } 

     // process the common aggregated queue 
     void ProcessCommonQeueue(CancellationToken token) 
     { 
      while (true) 
      { 
       // observe cancellation 
       token.ThrowIfCancellationRequested(); 
       // this blocks and waits 

       // get a message, this blocks and waits 
       var message = _commonQueue.Take(token); 

       // process this message 
       Console.WriteLine(message.ToString()); 
      } 
     } 

     // run the whole process 
     async Task RunAsync(CancellationToken token) 
     { 
      var queues = new List<BlockingCollection<object>>(); 
      _commonQueue = new BlockingCollection<object>(); 

      // start individual queue processors 
      var tasks = Enumerable.Range(0, 4).Select((i) => 
      { 
       var queue = new BlockingCollection<object>(); 
       queues.Add(queue); 

       return Task.Factory.StartNew(
        () => ProcessQeueue(i, queue, token), 
        TaskCreationOptions.LongRunning); 
      }).ToList(); 

      // start the common queue processor 
      tasks.Add(Task.Factory.StartNew(
       () => ProcessCommonQeueue(token), 
       TaskCreationOptions.LongRunning)); 

      // start the simulators 
      tasks.AddRange(Enumerable.Range(0, 4).Select((i) => 
       SimulateMessagesAsync(queues, token))); 

      // wait for all started tasks to complete 
      await Task.WhenAll(tasks); 
     } 

     // simulate a message source 
     async Task SimulateMessagesAsync(List<BlockingCollection<object>> queues, CancellationToken token) 
     { 
      var random = new Random(Environment.TickCount); 
      while (true) 
      { 
       token.ThrowIfCancellationRequested(); 
       await Task.Delay(random.Next(100, 1000)); 
       var queue = queues[random.Next(0, queues.Count)]; 
       var message = Guid.NewGuid().ToString() + " " + DateTime.Now.ToString(); 
       queue.Add(message); 
      } 
     } 

     // entry point 
     static void Main(string[] args) 
     { 
      Console.WriteLine("Ctrl+C to stop..."); 

      var cts = new CancellationTokenSource(); 
      Console.CancelKeyPress += (s, e) => 
      { 
       // cancel upon Ctrl+C 
       e.Cancel = true; 
       cts.Cancel(); 
      }; 

      try 
      { 
       new Program().RunAsync(cts.Token).Wait(); 
      } 
      catch (Exception ex) 
      { 
       if (ex is AggregateException) 
        ex = ex.InnerException; 
       Console.WriteLine(ex.Message); 
      } 

      Console.WriteLine("Press Enter to exit"); 
      Console.ReadLine(); 
     } 
    } 
} 

另一个想法可能是使用Reactive Extensions (Rx)。如果您可以将到达的消息视为事件,并且Rx可以帮助将它们聚合为单个流。

+0

基于您的实现,我创建了一个实现。它不使用BlockingCollection集合作为消息接收,消息处理和向代理的消息确认必须在同一个通道上完成。 https://gist.github.com/mahesh-singh/9214295 不确定此实施是否是正确的实施。 – Mahesh

+0

@Mahesh,您可能想要在http://codereview.stackexchange.com上发布此信息。 – Noseratio

+0

设置新的代码审查http://codereview.stackexchange.com/questions/42836/listen-to-multiple-rabbitmq-queue-by-task-and-process-the-message – Mahesh