我试图设置一个RabbitMQ邮件队列,以便我可以发送消息来启动长时间运行的进程,并且还能够发送消息以取消长时间运行的进程(如果需要)。于是我开始了一个EventingBasicConsumer
,并没有在我的Recieved
处理程序是这样的:BasicAck异步处理邮件
if (startProcess)
{
// start a long running process
}
else if (cancelProcess)
{
// cancel the currently running process
}
channel.BasicAck(ea.DeliveryTag, false);
这不起作用,因为EventingBasicConsumer
不是多线程的,只能同时处理一个消息。所以它不能处理取消消息,直到它完成了长时间运行的过程(在这一点上,显然没有意义)。所以接下来我试过这个:
if (startProcess)
{
Task.Run(() => {
// start a long running process
}
}
else if (cancelProcess)
{
// cancel the currently running process
}
channel.BasicAck(ea.DeliveryTag, false);
而且这个工作。我现在可以取消长时间运行的进程......但是,我承认请求立即运行长时间运行的进程,而不是在完成之后运行。这意味着如果长时间运行的进程崩溃,则消息已被删除。所以这需要原始的发送者保持跟踪并且让接收者不得不发送消息来说完成了,这一切都变得有些复杂。
所以,我想也许我可以改变EventingBasicConsumer
只是总是在一个新线程上激发它的Received
事件。所以,我创建了这样的事情:
public class AsyncRabbitConsumer : DefaultBasicConsumer
{
// code all the same as EventingBasicConsumer except this bit:
public override void HandleBasicDeliver(string consumerTag,
ulong deliveryTag,
bool redelivered,
string exchange,
string routingKey,
IBasicProperties properties,
byte[] body)
{
base.HandleBasicDeliver(consumerTag,
deliveryTag,
redelivered,
exchange,
routingKey,
properties,
body);
if (Received != null)
{
var args = new BasicDeliverEventArgs(consumerTag,
deliveryTag,
redelivered,
exchange,
routingKey,
properties,
body);
Task.Run(() =>
{
Received(this, args);
});
}
}
}
现在,在我的代码第一个片段,我可以把它处理取消的消息,而长期运行的进程仍在运行和长时间运行过程中不会ACK和删除它的消息,直到它实际完成(或取消)。所以这应该是伟大的......除非我取消我得到这个:
型“RabbitMQ.Client.Exceptions.AlreadyClosedException”的异常出现在RabbitMQ.Client.dll但在用户代码
没有处理附加信息:已关闭:AMQP操作已中断:由Peer发起的AMQP关闭原因,代码= 406,text =“PRECONDITION_FAILED-未知交付标签3”,classId = 60,methodId = 80,原因=
从channel.BasicAck
似乎是开始长时间运行的线程的步骤处理。那么这里发生了什么?我认为确认(首先取消消息,然后是长时间运行的过程消息)正在越过这里。有没有什么体面的方法来解决这个问题?还是我吠叫错了树?
可能值得注意的是,取消长时间运行的过程并不是即时的。它会在下一个方便的地方取消,所以几乎可以确定取消信息将在长时间运行过程结束之前完成处理。
@Rob:正是因为我有上面的例外。 –
对不起 - 我不好,我在那部分上看了一遍。 – Rob
你如何管理连接?它看起来像你的渠道被处置,这是什么导致你的失败。另外 - 有多少人在那里?如果您有多个人,可能您的第二个工作人员正在取消取消,并试图取消一个不存在的任务。您可以让您的工作人员监视特定频道上的取消,或指定路由键 – Rob