2017-04-22 78 views
1

我想以下情形使用AMQP保持通过<code>amqp</code>一个长期连接到远程工作者

服务器调度“开始”行动工作进程模型,具体如下(假设channelaction是以前提供和行动为START一些有效载荷。)

channel.assertQueue('', { exclusive: true }).then(({ queue }) => { 
    const cId = uuid() 

    channel.consume(queue, (msg) => { 
    if (msg.properties.correlationId === cId) { 
     const response = JSON.parse(msg.content.toString()) 
     console.log('response', response) 
     resolve(response) 
    } 
    }, { noAck: true }) 

    const msg = JSON.stringify(action) 
    channel.sendToQueue(
    QUEUE_NAME, 
    new Buffer(msg), 
    { correlationId: cId, replyTo: queue } 
) 
}, reject) 

工人得到START actioncorrelationIdreplyTo队列名称一起,增加了有效载荷的事情要做自己的内部列表,并响应到“0123_S”队列并执行“S​​TART_SUCCESS”操作。

现在,工作人员将通过其内部要做的事情列表并执行它们,并通过相同的replyTo队列向服务器发出“更新”操作,因此服务器需要知道要继续收听到该队列进行更新,并且需要知道哪个工作人员正在处理任何特定任务的更新。服务器足够聪明,可以知道某个特定任务已经启动,因此在这种情况下不会重新分派。

但是,当它的时间为工人停止做任务,需要服务器知道要发送一个“STOP”消息给工人。有没有办法让工作人员向服务器发送某种直接的amqp通道给服务器,以便服务器可以使用它发送STOP消息?

回答

1

最简单的答案似乎是让工作人员创建一个“回复”队列,然后在“START_SUCCESS”消息中将该标识符发送到服务器,并将该标识符发送到位于某处的服务器存储区。

不过,我觉得很多的RabbitMQ的力量来自于一个事实,即消息不直接发布到队列,而是交流,他们的最终目标是通过自己的路由键确定。 (按队列名称发布实际上是通过使用路由密钥作为队列名称的交换机。)如果您不熟悉不同类型的交换,请阅读the RabbitMQ Getting Started tutorials

在这种情况下,不要想到服务器和工作人员需要知道对方的身份,你可以考虑发布和订阅对方的更新。如果一切都发布到交易所,那么服务器和工作人员实际上并不需要知道任何关于彼此身份的信息。

以下是我看到的工作:

  1. 服务器生成特定作业的唯一ID。
  2. 服务器发布一个开始消息到交换jobs.new,与所述消息中的路由密钥进行分类的作业的类型,和作业ID。
  3. 服务器将绑定密钥设置为作业ID,将匿名队列绑定到直接或主题交换jobs.status
  4. 工人启动并从jobs.ready(或jobs.ready.some_type)接收一条消息。
  5. 工作人员将一个匿名队列绑定到jobs.control交换,作业ID为绑定密钥。
  6. 工作人员启动任务,并将START_SUCCESS消息发布到交换jobs.status,并将作业ID作为路由密钥。
  7. 服务器接收从它结合在步骤3中的队列中的消息START_SUCCESS,并更新其状态该作业。
  8. 工人周期性地发送更新消息到交换jobs.status;再次,路由密钥与作业ID相匹配,因此服务器会收到该消息。
  9. 当服务器要停止(或修改)正在运行的作业,它发布一个STOP消息与作业ID作为路由键jobs.control交换。
  10. 工作人员在步骤5中绑定的队列上接收到此消息,并停止作业。

从RabbitMQ的侧面看,你有这些元素:

  • 3交流:
    • jobs.new其中服务器发布新的就业机会。如果所有工作人员都可以处理所有工作,这可能是一个简单的扇出交换,或者它可能是一个话题交换,它将其路由到不同类型的工作人员的不同工作队列中。
    • jobs.status其中更新由工人出版。这将是直接或主题交换,其路由键是或包含作业ID。
    • jobs.control其中更新由服务器发布到控制现有就业岗位。再次,这将是直接或主题交换,其路由键是或包含作业ID。
  • 永久队列:
    • 单个jobs.ready队列,或不同jobs.ready.some_type队列,结合到jobs.new交换。
  • 匿名队列: 每个服务器创建和绑定使用该作业的ID的jobs.status交换工作
    • 一个队列。另外,服务器进程可以有一个入站流量队列,只需从接收到的消息中读取作业ID即可。每由工人创建的工作
    • 一个队列,并绑定到使用它当前正在处理的作业的ID的jobs.control交换。

注意,您可以附加额外的队列来这些交流,以获得流量,例如副本用于记录或调试。对于主题交换,只需绑定额外的队列和绑定密钥#,它将获得所有消息的副本,而不会中断任何现有的绑定。

+0

非常感谢。这是一个很棒的答案。 –

相关问题