为了与RabbitMQ的管理我的员工我用下面的库:
https://github.com/php-amqplib/php-amqplib
然后,我创建定义如何我的工人应该作品(包含所有RabbitMQ的逻辑),它给我一个班类似的东西:
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
abstract class QueueAMQPConsumer
{
protected $connection;
protected $debug;
protected $queueName;
protected $exchange;
public function __construct(AMQPStreamConnection $AMQPConnection, $queueName, $exchange = null)
{
$this->connection = $AMQPConnection;
$this->queueName = $queueName;
$this->exchange = $exchange;
}
public function run($debug = false)
{
$this->debug = $debug;
$channel = $this->connection->channel();
if ($this->exchange !== null) {
$channel->exchange_declare($this->exchange, "topic", false, true, false);
}
$channel->queue_declare($this->queueName, false, true, false, false);
if ($this->exchange !== null) {
$channel->queue_bind($this->queueName, $this->exchange);
}
$channel->basic_qos(null, 1, null);
$channel->basic_consume($this->queueName, '', false, false, false, false, [$this, 'callback']);
while (count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$this->connection->close();
}
final function callback(AMQPMessage $message)
{
$result = $this->process($message);
if (false === $result) {
$message->delivery_info['channel']->basic_nack($message->delivery_info['delivery_tag'], false, true);
} else {
$message->delivery_info['channel']->basic_ack($message-> delivery_info['delivery_tag']);
}
}
/**
* @param AMQPMessage $message
*
* @return bool
*/
abstract protected function process(AMQPMessage $message);
}
该类允许设置队列,交换(在这种情况下,主题),服务质量(你可以自定义所有这些参数,它只是一个例子)等。
然后它会在回调中循环。这里的回调是抽象方法进程(...),它将在需要处理队列的不同工作人员上实现。所以的责任“循环/听”是渠道上:$channel->wait();
然后,我将创建一个需要处理队列中的消息的子类:
class MyWorker extends QueueAMQPConsumer
{
protected function process(AMQPMessage $message)
{
// .... process your message here
}
}
那么工人会听你的一直排队,并在他们到达队列时处理这些消息。 如果您的process(...)
返回其他内容而不是false,则会确认该消息。
你只需要启动你的类像:
$consumer = new MyWorker(....);
$consumer->run();