2016-09-07 33 views
0

假设我有经常充满一些数据的兔子队列(例如,用户提供了一些我们需要稍后分析的动作)。每秒钟新增30个项目。 我需要的是创建一个工作人员,通过这个队列来查看这个数据并执行一些任务。我可以这样做:创建php rabbit worker的最佳方法

class Worker 
{ 

    public function run() 
    { 
     $queue = new Queue('exchange', 'queue'); 
     while (true) 
     { 
      $queue->processQueue(); 
     } 
    } 
} 

而不仅仅是在服务器上运行worker.php,似乎工作。

但我想知道,如果这个无限循环会给我的兔子实例增加额外的负载,如果没有数据要继续?也许更好的办法是做像水木清华

class Worker 
{ 
    CONST IDLE = 5; 

    private $start = 0; 

    public function run() 
    { 
     $this->start = time(); 

     $queue = new Queue('exchange', 'queue'); 
     while (true) 
     { 
      $queue->processQueue(); 

      //don't allow this worker to be working a lot 
      if (time() - $this->start >= 60 * 60 - self::IDLE) 
      { 
       break; 
      } 

      sleep(self::IDLE); 
     } 

     $queue->close(); 
    } 
} 

所以我的工人便不会从兔不断,但睡眠数据一阵子。在工作一小时后,它会停止工作,另一个工作实例将被crontab作业或其他作业调用。

回答

2

为了与RabbitMQ的管理我的员工我用下面的库:

https://github.com/php-amqplib/php-amqplib

然后,我创建定义如何我的工人应该作品(包含所有RabbitMQ的逻辑),它给我一个班类似的东西:

use PhpAmqpLib\Connection\AMQPStreamConnection; 
use PhpAmqpLib\Message\AMQPMessage; 

abstract class QueueAMQPConsumer 
{  
    protected $connection; 

    protected $debug; 

    protected $queueName; 

    protected $exchange; 

    public function __construct(AMQPStreamConnection $AMQPConnection, $queueName, $exchange = null) 
    { 
     $this->connection = $AMQPConnection; 
     $this->queueName = $queueName; 
     $this->exchange = $exchange; 
    } 

    public function run($debug = false) 
    { 
     $this->debug = $debug; 
     $channel = $this->connection->channel(); 
     if ($this->exchange !== null) { 
      $channel->exchange_declare($this->exchange, "topic", false, true, false); 
     } 

     $channel->queue_declare($this->queueName, false, true, false, false); 
     if ($this->exchange !== null) { 
      $channel->queue_bind($this->queueName, $this->exchange); 
     } 

     $channel->basic_qos(null, 1, null); 
     $channel->basic_consume($this->queueName, '', false, false, false, false, [$this, 'callback']); 

     while (count($channel->callbacks)) { 
      $channel->wait(); 
     } 

     $channel->close(); 
     $this->connection->close(); 
    } 


    final function callback(AMQPMessage $message) 
    { 
     $result = $this->process($message); 

     if (false === $result) { 
      $message->delivery_info['channel']->basic_nack($message->delivery_info['delivery_tag'], false, true); 
     } else { 
      $message->delivery_info['channel']->basic_ack($message-> delivery_info['delivery_tag']); 
     } 
    } 

    /** 
    * @param AMQPMessage $message 
    * 
    * @return bool 
    */ 
    abstract protected function process(AMQPMessage $message); 
} 

该类允许设置队列,交换(在这种情况下,主题),服务质量(你可以自定义所有这些参数,它只是一个例子)等。

然后它会在回调中循环。这里的回调是抽象方法进程(...),它将在需要处理队列的不同工作人员上实现。所以的责任“循环/听”是渠道上:$channel->wait();

然后,我将创建一个需要处理队列中的消息的子类:

class MyWorker extends QueueAMQPConsumer 
{ 
    protected function process(AMQPMessage $message) 
    { 
     // .... process your message here 
    } 
} 

那么工人会听你的一直排队,并在他们到达队列时处理这些消息。 如果您的process(...)返回其他内容而不是false,则会确认该消息。

你只需要启动你的类像:

$consumer = new MyWorker(....);  
$consumer->run();