1
下面是一个简单的.net核心1.1控制台应用程序。使用-r参数调用它,它读取rabbitmq队列中的所有消息,用任何其他参数调用它,并将每个参数作为消息排队。.NET Core 1.1接收RabbitMQ消息
下面是问题所在,我可以将消息排入队列,但所有尝试读取消息都不会导致消息被读取。显然,我没有正确使用队列,并希望得到一些指导。
谢谢!
using System;
using System.Collections.Generic;
using System.Text;
using Newtonsoft.Json;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace RabbitMqDemo
{
class Program
{
static void Main(string[] args)
{
var client = new MessagingClient();
if (args.Length == 1 && args[0].ToLower() == "-r")
{
Console.WriteLine("Reading Messages from Queue.");
var messages = client.ReceiveMessages();
Console.WriteLine($"Read {messages.Length} message(s) from queue.");
foreach(var msg in messages)
Console.WriteLine(msg);
}
else
{
foreach (var msg in args)
{
client.SendMessage(msg);
}
Console.WriteLine($"Enqueued {args.Length} Message.");
}
}
}
internal class MessagingClient
{
private readonly ConnectionFactory connectionFactory;
private string ExchangeName => "defaultExchange";
private string RoutingKey => "";
private string QueueName => "Demo";
private string HostName => "localhost";
public MessagingClient()
{
this.connectionFactory = new ConnectionFactory {HostName = this.HostName};
}
public void SendMessage(string message)
{
using (var connection = this.connectionFactory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
this.QueueDeclare(channel, this.QueueName);
var properties = this.SetMessageProperties(channel, message);
string messageJson = JsonConvert.SerializeObject(message);
var body = Encoding.UTF8.GetBytes(messageJson);
channel.BasicPublish(exchange: this.ExchangeName, routingKey: this.RoutingKey, basicProperties: properties, body: body);
}
}
}
public string[] ReceiveMessages()
{
var messages = new List<string>();
using (var connection = this.connectionFactory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
this.QueueDeclare(channel, this.QueueName);
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
string bodystring = Encoding.UTF8.GetString(ea.Body);
messages.Add(bodystring);
// ReSharper disable once AccessToDisposedClosure
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
channel.BasicConsume(queue: this.QueueName, autoAck: false, consumer: consumer);
}
}
return messages.ToArray();
}
private void QueueDeclare(IModel channel, string queueName)
{
channel.ExchangeDeclare(ExchangeName, type: ExchangeType.Direct,
durable: true,
autoDelete: false,
arguments: null);
var queueDeclared = channel.QueueDeclare(queue: queueName,
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
channel.QueueBind(queueName, ExchangeName, RoutingKey);
}
private IBasicProperties SetMessageProperties(IModel channel, object message)
{
var properties = channel.CreateBasicProperties();
properties.ContentType = "application/json";
properties.Persistent = true;
return properties;
}
}
}
男人让我感到愚蠢。当然,它不会阻止正在执行的线程。我添加了一些阻止逻辑,它像一个冠军。谢谢卢克! – cdarrigo
很高兴我能帮到你 –