1
这是事情。如何从AMQP(RabbitMQ)队列中删除消息?
我从RabbitMQ的为了处理上发送每封电子邮件的重要信息使用PHP AMQP读取结果队列。完成后,我需要删除或标记为已写入的消息,以便下次读队列时我不会收到已处理的消息。
由于Rabbitmq服务器每小时发送超过10,000封电子邮件,每当我读取队列以处理结果发送时,脚本可以运行至少5分钟以处理队列中的所有邮件,所以在它之后已经完成,在这5分钟内有数百个新消息被放置。这使得我无法在脚本完成后清除队列,因为它会在脚本运行期间删除未处理的脚本。
这给我留下了只有一个选择。在AMQP脚本处理或读取之后立即标记或删除消息。
有没有办法做到这一点? (这里是脚本)
<?php
/**
* result.php
* Script that connects to RabbitMQ, and takes the result message from
* the result message queue.
*/
// include the settings
require_once('settings.php');
// try to set up a connection to the RabbitMQ server
try
{
// construct the connection to the RabbitMQ server
$connection = new AMQPConnection(array(
'host' => $hostname,
'login' => $username,
'password' => $password,
'vhost' => $vhost
));
// connect to the RabbitMQ server
$connection->connect();
}
catch (AMQPException $exception)
{
echo "Could not establish a connection to the RabbitMQ server.\n";
}
// try to create the channel
try
{
// open the channel
$channel = new AMQPChannel($connection);
}
catch (AMQPConnectionException $exception)
{
echo "Connection to the broker was lost (creating channel).\n";
}
// try to create the queue
try
{
// create the queue and bind the exchange
$queue = new AMQPQueue($channel);
$queue->setName($resultbox);
$queue->setFlags(AMQP_DURABLE);
$queue->bind('exchange1', 'key1');
$queue->declare();
}
catch (AMQPQueueException $exception)
{
echo "Channel is not connected to a broker (creating queue).\n";
}
catch (AMQPConnectionException $exception)
{
echo "Connection to the broker was lost. (creating queue)/\n";
}
// Get the message from the queue.
while ($envelope = $queue->get()) {
//Function that processes the message
process_message($envelope->getBody());
}
$queue->purge();
// done, close the connection to RabbitMQ
$connection->disconnect();
?>
我如何能实现它的PHP代码?谢谢 – rodvela
用代码更新了我的答案。我相信你已经阅读过,但以防万一,这里是官方文档http://php.net/manual/en/book.amqp.php,它在某些部分有些过时,但仍然很好理解如何使用php-amqp并且有一些你可能会觉得有用的例子 – pinepain
ps看看这里的例子https://github.com/pinepain/amqpy/tree/master/demo/canonical – pinepain