2016-04-22 130 views
3

我试图设置一个RabbitMQ邮件队列,以便我可以发送消息来启动长时间运行的进程,并且还能够发送消息以取消长时间运行的进程(如果需要)。于是我开始了一个EventingBasicConsumer,并没有在我的Recieved处理程序是这样的:BasicAck异步处理邮件

if (startProcess) 
{ 
    // start a long running process 
} 
else if (cancelProcess) 
{ 
    // cancel the currently running process 
} 
channel.BasicAck(ea.DeliveryTag, false); 

这不起作用,因为EventingBasicConsumer不是多线程的,只能同时处理一个消息。所以它不能处理取消消息,直到它完成了长时间运行的过程(在这一点上,显然没有意义)。所以接下来我试过这个:

if (startProcess) 
{ 
    Task.Run(() => { 
     // start a long running process 
    } 
} 
else if (cancelProcess) 
{ 
    // cancel the currently running process 
} 
channel.BasicAck(ea.DeliveryTag, false); 

而且这个工作。我现在可以取消长时间运行的进程......但是,我承认请求立即运行长时间运行的进程,而不是在完成之后运行。这意味着如果长时间运行的进程崩溃,则消息已被删除。所以这需要原始的发送者保持跟踪并且让接收者不得不发送消息来说完成了,这一切都变得有些复杂。

所以,我想也许我可以改变EventingBasicConsumer只是总是在一个新线程上激发它的Received事件。所以,我创建了这样的事情:

public class AsyncRabbitConsumer : DefaultBasicConsumer 
{ 
    // code all the same as EventingBasicConsumer except this bit: 
    public override void HandleBasicDeliver(string consumerTag, 
     ulong deliveryTag, 
     bool redelivered, 
     string exchange, 
     string routingKey, 
     IBasicProperties properties, 
     byte[] body) 
    { 
     base.HandleBasicDeliver(consumerTag, 
      deliveryTag, 
      redelivered, 
      exchange, 
      routingKey, 
      properties, 
      body); 
     if (Received != null) 
     { 
      var args = new BasicDeliverEventArgs(consumerTag, 
        deliveryTag, 
        redelivered, 
        exchange, 
        routingKey, 
        properties, 
        body); 

      Task.Run(() => 
      { 
       Received(this, args); 
      }); 
     } 
    } 
} 

现在,在我的代码第一个片段,我可以把它处理取消的消息,而长期运行的进程仍在运行长时间运行过程中不会ACK和删除它的消息,直到它实际完成(或取消)。所以这应该是伟大的......除非我取消我得到这个:

型“RabbitMQ.Client.Exceptions.AlreadyClosedException”的异常出现在RabbitMQ.Client.dll但在用户代码

没有处理

附加信息:已关闭:AMQP操作已中断:由Peer发起的AMQP关闭原因,代码= 406,text =“PRECONDITION_FAILED-未知交付标签3”,classId = 60,methodId = 80,原因=

channel.BasicAck似乎是开始长时间运行的线程的步骤处理。那么这里发生了什么?我认为确认(首先取消消息,然后是长时间运行的过程消息)正在越过这里。有没有什么体面的方法来解决这个问题?还是我吠叫错了树?

可能值得注意的是,取消长时间运行的过程并不是即时的。它会在下一个方便的地方取消,所以几乎可以确定取消信息将在长时间运行过程结束之前完成处理。

+0

@Rob:正是因为我有上面的例外。 –

+0

对不起 - 我不好,我在那部分上看了一遍。 – Rob

+0

你如何管理连接?它看起来像你的渠道被处置,这是什么导致你的失败。另外 - 有多少人在那里?如果您有多个人,可能您的第二个工作人员正在取消取消,并试图取消一个不存在的任务。您可以让您的工作人员监视特定频道上的取消,或指定路由键 – Rob

回答

0

你能做的就是像消费者一样 - 第一个是长时间运行的过程,第二个是杀死长时间运行过程的代理。首先会收到消息,在处理完成后处理消息和ACK,如果检测到消除信号,也会执行ACK。代理商显然会收到取消消息并杀死第一个消息,并产生第一个消息的另一个实例。显然这需要流程(消费者)在RMQ之外进行通信。

想到的另一件事(但我从来没有尝试过这样的事情)是,您在消费者中将预取计数设置为2,并且在“处理单个数据消息”时,将第二条消息发布到代理(转发),除非它是CANCEL消息,在这种情况下,在中止处理之后,您同时确认它们两个 - CANCEL和DATA(以便像这样调用)消息。

另一种选择可能是在“长时间运行的过程”中,您有两个消费者线程,每个线程都使用自己的通道。