2017-05-14 115 views
0

我有共享库作为总线,我试图从rabbitmq接收邮件,但ConsumerOnReceived永远不会被触发。收到不会触发

namespace Bus 
{ 
    public class MessageListener 
    { 

    private static IConnection _connection; 
    private static IModel _channel; 

    public void Start(string hostName, int port, string queueName) 
    { 
     var factory = new ConnectionFactory() { HostName = hostName, Port = port }; 
     using (var connection = factory.CreateConnection()) 
     using (var channel = connection.CreateModel()) 
     { 
      channel.QueueDeclare(queue: queueName, 
           durable: false, 
           exclusive: false, 
           autoDelete: false, 
           arguments: null); 

      var consumer = new EventingBasicConsumer(channel); 
      consumer.Received += ConsumerOnReceived; 

      channel.BasicConsume(queue: queueName, 
           noAck: true, 
           consumer: consumer); 
     } 
    } 

    public static void Stop() 
    { 
     _channel.Close(200, "Goodbye"); 
     _connection.Close(); 
    } 

    public virtual void ConsumerOnReceived(object sender, BasicDeliverEventArgs ea) 
    { 
     var body = ea.Body; 
     var message = Encoding.UTF8.GetString(body); 
    } 
} 

public static class MessageSender 
{ 
    public static void Send(string hostName, int port, string queueName, string message) 
    { 
     var factory = new ConnectionFactory() { HostName = hostName, Port = port }; 
     using (var connection = factory.CreateConnection()) 
     using (var channel = connection.CreateModel()) 
     { 
      channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null); 

      var body = Encoding.UTF8.GetBytes(message.ToString()); 

      channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: body); 
     } 
    } 
} 

} 

核心

namespace Core 
{ 
    class Program 
    { 
     static void Main(string[] args) 
     { 
      new MessageListener().Start("localhost", 5672, "MakePayment"); 

      Console.WriteLine("Core Service"); 
      string line = Console.ReadLine(); 
     } 

    } 
} 

namespace Core 
{ 
    public class MessageListener : Bus.MessageListener 
    { 
     public override void ConsumerOnReceived(object sender, BasicDeliverEventArgs ea) 
     { 
      var body = ea.Body; 
      var message = Encoding.UTF8.GetString(body); 
     } 
    } 

} 

回答

1

的问题是在这里

channel.BasicConsume(queue: queueName, noAck: true, consumer: consumer); 

然而,BasicConsume不是阻塞方法,因此,当你调用Start您创建一个连接和一个通道,但随后得到立即处理。

以下是不是一个解决方案,但您可以通过以下确认:

channel.BasicConsume(queue: queueName, noAck: true, consumer: consumer); 
Console.ReadKey();//←Added Line 

你的程序会以这种方式工作。

这是我提出的解决方案。请大家注意,_channel.BasicConsume(queue: queueName, noAck: true, consumer: consumer);将开始在另一个线程,这样你就不需要使用while(...)

using System; 
using System.Text; 
using System.Threading; 
using System.Threading.Tasks; 
using RabbitMQ.Client; 
using RabbitMQ.Client.Events; 

namespace Bus { 

    public abstract class BaseMessageListener { 
     private static IModel _channel; 
     private static IConnection _connection; 

     public abstract void ConsumerOnReceived(object sender, BasicDeliverEventArgs ea); 

     public void Start(string hostName, int port, string queueName) { 
      var factory = new ConnectionFactory() { HostName = hostName, Port = port }; 
      _connection = factory.CreateConnection(); 
      _channel = _connection.CreateModel(); 
      _channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false); 
      var consumer = new EventingBasicConsumer(_channel); 
      consumer.Received += ConsumerOnReceived; 
      _channel.BasicConsume(queue: queueName, noAck: true, consumer: consumer);//This will start another thread! 
     } 

     public void Stop() { 
      _channel.Close(200, "Goodbye"); 
      _connection.Close(); 
     } 
    } 
} 

namespace StackOverfFLow.RabbitMQSolution { 

    using Bus; 

    public class MessageListener : BaseMessageListener { 

     public override void ConsumerOnReceived(object sender, BasicDeliverEventArgs ea) { 
      var body = ea.Body; 
      var message = Encoding.UTF8.GetString(body); 
      Console.WriteLine(message); 
     } 
    } 

    internal class Program { 

     private static void Main(string[] args) { 
      var listener = new MessageListener(); 
      listener.Start("localhost", 5672, "MakePayment"); 
      Console.WriteLine("Core Service Started!"); 
      Console.ReadKey(); 
      listener.Stop(); 
     } 
    } 
} 
+0

这个工作的确但有一个更好的做法则ReadKey来处理呢? – Mert

+0

我用“while(_channel.IsOpen){}”解决了这个问题,“ – Mert

+1

@Mert,我不确定你把'while(_channel.IsOpen){}'放在哪里,但是如果你把它放在'channel.BasicConsume(queue: queueName,noAck:true,consumer:consumer);'这一行;我认为你不需要那个。请看我更新的答案。 –