2014-09-10 296 views
0

The basic RabbitMQ tutorial给出了如何从队列中不断获取消息的示例:在C#中,如何处理当前队列中的所有RabbitMQ消息?

var factory = new ConnectionFactory() { HostName = "localhost" }; 
using (var connection = factory.CreateConnection()) 
{ 
    using (var channel = connection.CreateModel()) 
    { 
     channel.QueueDeclare("hello", false, false, false, null); 

     var consumer = new QueueingBasicConsumer(channel); 
     channel.BasicConsume("hello", true, consumer); 

     Console.WriteLine(" [*] Waiting for messages." + 
           "To exit press CTRL+C"); 
     while (true) 
     { 
      var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); 

      var body = ea.Body; 
      var message = Encoding.UTF8.GetString(body); 
      Console.WriteLine(" [x] Received {0}", message); 
     } 
    } 
} 

我想要做的就是找回已被放置到队列,然后停止所有消息。

这里是如果我下午一点开始我的代码,将解决我的问题

  1. 两个例子,我想处理所有已放置到队列前下午1点的消息。

OR

  • 如果我在13:00:00开始我的代码,它需要10秒钟,我的代码运行,我不介意,如果它包括在13:00:00和13:00:10之间放置在队列中的消息,只要队列一空就停止。
  • 我意识到我可能会在我的邮件中添加一个时间戳并检查该邮件,或者我可以摆弄超时值,但我想知道是否有任何内置方法可以正确执行此操作。

    在此先感谢。

    +2

    从你的描述,它听起来就像排队是不正确的工具,队列为了缓冲消息,使得它们的生成速度比它们可以消耗的速度更快,队列不会被设计为在你决定处理它们之前持续消息。尽管RabitMQ支持持久队列,但这更多的是容错。您应该始终在队列中立即处理消息。如果您稍后想要对消息进行处理,则可以从某个持久性数据存储库(如数据库)中保存并检索它们。 – 2014-09-10 10:59:09

    +0

    感谢您的评论,本。 – 2014-09-10 11:04:36

    +0

    它听起来像一个MQ不是你的问题的解决方案。你可以做的是创建一个名为processQueue的布尔变量并将其设置为true。然后,进行处理并将该变量更新为false。 while循环将改为:while(processQueue){// ...} – 2014-09-10 11:10:41

    回答

    3

    从评论的集成功能似乎的RabbitMQ并不意味着批量处理,所以它并没有被设计用于此目的。

    我也注意到,当测试DequeueNoWait方法或尝试以零超时进行Dequeue时,它们根本不起作用,只是返回null。

    以下解决方案使用QueueDeclare得到现有消息的计数,并且不需要时间戳或哈克超时:

    var factory = new ConnectionFactory { HostName = "localhost" }; 
    using (var connection = factory.CreateConnection()) 
    { 
        using (var channel = connection.CreateModel()) 
        { 
         var queueDeclareResponse = channel.QueueDeclare(Constants.QueueName, false, false, false, null); 
    
         var consumer = new QueueingBasicConsumer(channel); 
         channel.BasicConsume(Constants.QueueName, true, consumer); 
    
         Console.WriteLine(" [*] Processing existing messages."); 
    
         for (int i = 0; i < queueDeclareResponse.MessageCount; i++) 
         { 
          var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); 
          var body = ea.Body; 
          var message = Encoding.UTF8.GetString(body); 
          Console.WriteLine(" [x] Received {0}", message); 
         } 
         Console.WriteLine("Finished processing {0} messages.", queueDeclareResponse.MessageCount); 
         Console.ReadLine(); 
        } 
    } 
    
    +0

    不要使用'QueueingBasicConsumer'它没有内置的恢复,使用EventingConsumer – PUG 2015-08-18 19:45:25

    1

    你可以做什么:

    1)首先时间戳添加到消息对象

    2)拒绝的消息,如果时间戳是无效的。它的RabbitMQ

    Reference

    +0

    谢谢你的回答,EvilFonti。我在我的问题中说过,“我意识到我可以在我的消息中加上时间戳并检查它,或者我可以摆弄超时值,但我想知道是否有任何内置的方法来正确执行此操作。”但出于兴趣,用什么方法来获取使用索引的元素? – 2014-09-10 12:41:55

    +0

    我很抱歉,我以为我用了一个索引来访问rabbitmq中的消息(也许这是MSMQ的情况,但我不知道了......)。那是不可能的,但你可以拒绝它 - >我编辑了答案。 – EvilFonti 2014-09-10 13:09:22

    +0

    如果在应用程序启动后没有消息发送一段时间,则此解决方案意味着必须等待超时。我想这意味着人们需要设置一个短暂的超时时间,但足够长的时间来检索现有的消息。 – 2014-09-10 13:30:35

    相关问题