2017-07-29 105 views
1

下面是一个简单的.net核心1.1控制台应用程序。使用-r参数调用它,它读取rabbitmq队列中的所有消息,用任何其他参数调用它,并将每个参数作为消息排队。.NET Core 1.1接收RabbitMQ消息

下面是问题所在,我可以将消息排入队列,但所有尝试读取消息都不会导致消息被读取。显然,我没有正确使用队列,并希望得到一些指导。

谢谢!

using System; 
using System.Collections.Generic; 
using System.Text; 
using Newtonsoft.Json; 
using RabbitMQ.Client; 
using RabbitMQ.Client.Events; 

namespace RabbitMqDemo 
{ 
    class Program 
    { 
     static void Main(string[] args) 
     { 
      var client = new MessagingClient(); 
      if (args.Length == 1 && args[0].ToLower() == "-r") 
      { 
       Console.WriteLine("Reading Messages from Queue."); 
       var messages = client.ReceiveMessages(); 
       Console.WriteLine($"Read {messages.Length} message(s) from queue."); 
       foreach(var msg in messages) 
        Console.WriteLine(msg); 
      } 
      else 
      { 
       foreach (var msg in args) 
       { 
        client.SendMessage(msg); 
       } 
       Console.WriteLine($"Enqueued {args.Length} Message."); 
      } 
     } 
    } 

    internal class MessagingClient 
    { 
     private readonly ConnectionFactory connectionFactory; 
     private string ExchangeName => "defaultExchange"; 
     private string RoutingKey => ""; 
     private string QueueName => "Demo"; 

     private string HostName => "localhost"; 


     public MessagingClient() 
     { 
      this.connectionFactory = new ConnectionFactory {HostName = this.HostName}; 
     } 

     public void SendMessage(string message) 
     { 
      using (var connection = this.connectionFactory.CreateConnection()) 
      { 
       using (var channel = connection.CreateModel()) 
       { 
        this.QueueDeclare(channel, this.QueueName); 
        var properties = this.SetMessageProperties(channel, message); 

        string messageJson = JsonConvert.SerializeObject(message); 
        var body = Encoding.UTF8.GetBytes(messageJson); 

        channel.BasicPublish(exchange: this.ExchangeName, routingKey: this.RoutingKey, basicProperties: properties, body: body); 
       } 
      } 
     } 

     public string[] ReceiveMessages() 
     { 
      var messages = new List<string>(); 
      using (var connection = this.connectionFactory.CreateConnection()) 
      { 
       using (var channel = connection.CreateModel()) 
       { 
        this.QueueDeclare(channel, this.QueueName); 
        channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); 


        var consumer = new EventingBasicConsumer(channel); 
        consumer.Received += (model, ea) => 
        {       
         string bodystring = Encoding.UTF8.GetString(ea.Body); 
         messages.Add(bodystring); 

         // ReSharper disable once AccessToDisposedClosure 
         channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); 
        }; 
        channel.BasicConsume(queue: this.QueueName, autoAck: false, consumer: consumer); 
       } 
      } 
      return messages.ToArray(); 
     } 

     private void QueueDeclare(IModel channel, string queueName) 
     { 
      channel.ExchangeDeclare(ExchangeName, type: ExchangeType.Direct, 
       durable: true, 
       autoDelete: false, 
       arguments: null); 

      var queueDeclared = channel.QueueDeclare(queue: queueName, 
       durable: true, 
       exclusive: false, 
       autoDelete: false, 
       arguments: null); 

      channel.QueueBind(queueName, ExchangeName, RoutingKey);    
     } 

     private IBasicProperties SetMessageProperties(IModel channel, object message) 
     { 
      var properties = channel.CreateBasicProperties(); 
      properties.ContentType = "application/json"; 
      properties.Persistent = true; 
      return properties; 
     } 

    } 
} 

回答

1
  • 首先,使用管理用户界面,以确保您的交换和队列设置正确的消息以及已发布到它。
  • 第二,ReceiveMessages()因此您的阅读器可能会立即返回一个空数组,然后事件有机会触发。当消费者收到来自RabbitMQ的消息时,您没有代码需要等待。请注意0​​如何使用Console.ReadLine()。在您的示例中,可以使用同步对象(ManualResetEvent)来防止ReceiveMessages()返回,直到读取特定的消息计数为止。
+0

男人让我感到愚蠢。当然,它不会阻止正在执行的线程。我添加了一些阻止逻辑,它像一个冠军。谢谢卢克! – cdarrigo

+0

很高兴我能帮到你 –