2014-09-29 128 views
2

我正在设置一个标准的独立线程,在C#中监听RabbitMQ。假设在线程监听的方法是这样的:在C#客户端为RabbitMQ正常停止消息消息的优雅方式是什么?

public void Listen() 
{ 
    using (var channel = connection.CreateModel()) 
    { 
     var consumer = SetupQueues(channel); 
     while (true) 
     { 
      var ea = consumer.Queue.Dequeue(); // blocking call 
      handler.HandleMessage(channel, ea); 
     } 
    } 
} 

什么是对的RabbitMQ C#的客户端正常停止的消息消费的一个优雅的方式?请记住,我没有发现任何利用在RabbitMQ的例子/文档或这些所谓的问题:

这里的问题是consumer.Queue.Dequeue()是一个阻塞调用。我试过这些选项:

  • 打电话channel.BasicCancel(string tag)。这导致阻塞呼叫中的System.IO.EndOfStreamException。我不想将这个异常作为控制流程的一部分,原因很明显。

  • 调用consumer.Queue.Dequeue(int millisecondsTimeout, out T result)并检查循环迭代之间的标志。这可以工作,但似乎hacky。

我想平稳地让线程退出并清理所有非托管资源,我可能有,所以没有线程中止,等

任何帮助表示赞赏。谢谢

+1

据我所知,没有“干净”的方式来做到这一点。我个人使用超时方法 - 我的消费者线程检查一个标志,并根据此标志决定要执行的操作。这实际上似乎工作正常。从本质上讲,应用程序的状态决定了队列是否应该被消费 - 并且消费者循环只是在它尝试之前检查它是否应该消耗。 – Beardy 2014-09-29 17:17:12

回答

1

DeQueue与超时&标志是做到这一点的方法。这是一种非常常见的模式,这就是为什么许多阻塞呼叫提供了启用超时的版本。

或者,抛出(已知)异常并不一定是控制流的坏处。优雅地关闭可能意味着实际上陷入异常,评论“请求频道关闭时引发这种情况”,然后干净地返回。这是TPL如何与CancellationToken一起工作的。

+0

了解,谢谢澄清。就使用异常而言,我很乐意使用它,但是,当'SharedQueue'被其他方法关闭时(网络故障,心跳),将抛出相同的异常('System.IO.EndOfStreamException:SharedQueue closed')错过了等) – Ralphie 2014-09-30 19:44:42

+0

是的,这是不幸的。 TPL以同样的方式执行,他们抛出一个ThreadAbortException,这可能是由于各种原因而发生的。我从来不明白为什么 - 开发人员创建异常很便宜,为什么他们不使用更多的面向对象? :( – 2014-09-30 20:42:52

1

阻塞方法不属性事件驱动。 我不知道为什么他们建议使用consumer.Queue.Dequeue();

无论如何,我通常不使用consumer.Queue.Dequeue();

我扩展默认的消费者来说,以这样的方式

class MyConsumer : DefaultBasicConsumer { 

    public MyConsumer(IModel model):base(model) 
    { 

    } 
    public override void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IBasicProperties properties, byte[] body) { 
    var message = Encoding.UTF8.GetString(body); 
    Console.WriteLine(" [x] Received {0}", message); 
    } 
} 


class Program 
{ 
    static void Main(string[] args) 
    { 

    var factory = new ConnectionFactory() { Uri = "amqp://aa:[email protected]/xxx" }; 
    using (var connection = factory.CreateConnection()) 
    { 
     using (var channel = connection.CreateModel()) 
     { 
     channel.QueueDeclare("hello", false, false, false, null); 
     var consumer = new MyConsumer(channel); 
     String tag = channel.BasicConsume("hello", true, consumer); 
     Console.WriteLine(" [*] Waiting for messages." + 
           " any key to exit"); 
     Console.ReadLine(); 
     channel.BasicCancel(tag); 


     /*while (true) 
     { 
      /////// DON'T USE THIS 
      var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); 
      var body = ea.Body; 
      var message = Encoding.UTF8.GetString(body); 
      Console.WriteLine(" [x] Received {0}", message); 
     }*/ 
     } 
    } 


    } 
} 

这样你不没有阻止方法,您可以正确释放所有资源。

编辑

我认为使用CTRL +Ç,打破了计划永远是错的。

相关问题