我遇到了与Net :: AMQP :: RabbitMQ和fork()的困惑行为。如果我...发布到RabbitMQ悄悄地在父进程中失败后分叉儿童
- 建立在父进程
- 到RabbitMQ的连接发布的消息
- 叉一个孩子,并等待它退出(孩子睡觉)
- 发布消息
......第二条消息实际上并没有发送到RabbitMQ(并且没有发生错误)。我做了很多测试,包括在发送前检查$connection->is_connected()
。从我的实验中获得一些有趣的花絮:
- 如果我在发布第二条消息之前尝试打开新通道,则会挂起
$connection->open_channel($newChannelId)
调用。 - 如果我允许孩子在父母发布第二条消息的同时继续运行(并且等到之后再到
waitpid
),那么它已成功发送。
我正在寻找一种方法来检测此连接不再有效时,分叉的孩子退出,并强制断开/重新连接。我将连接缓存在系统中各种其他模块使用的perl模块中,我不知道是否/当这些其他模块fork()
并行工作。我无法可靠地设置$SIG{CHLD}
处理程序,并在收到信号时断开连接,因为其他代码可能会覆盖我的处理程序。我拥有的唯一防弹选项是放弃缓存并为每封邮件建立连接,但这会显着降低发布速度(减少30倍)。
此脚本演示了这个问题(发布到称为话题交换“广播”):
#!/usr/bin/perl
use strict;
use Net::AMQP::RabbitMQ;
use JSON -support_by_pp;
my $connection;
my $channelId = 0;
sub sendToRabbit {
my ($message) = @_;
print "Sending message $message->{message}\n";
my $contentType = 'application/json';
my $payload = encode_json $message;
$connection->publish($channelId, 'test.route', $payload, { exchange => 'broadcast', force_utf8_in_header_strings => 1 }, { 'content_type' => $contentType });
print "Sent!\n";
}
sub main {
print "Starting...\n";
$connection = Net::AMQP::RabbitMQ->new();
$connection->connect('localhost', { user => 'guest', password => 'guest', port => 5672 });
$connection->channel_open(++$channelId);
print "Connected!\n";
# send first message
sendToRabbit({ message => 'body 1' });
# fork child
my $child = fork();
if(!$child) {
# child
sleep(1);
print "child exiting...\n";
exit(0);
}
else {
# parent
waitpid($child, 0);
}
print "parent continuing...\n";
# send second message - this will not be actually sent.
sendToRabbit({ message => 'body 2' });
# allow I/O to settle...
sleep(1);
}
main();
编辑:溶液
由于池上有关解决方案的谜底!
在我的RabbitMQ管理对象中,我注入了一些代码到connect()
例程中,允许我有选择地跳过分叉子的子句,这些子类本身不会调用connect()
。这似乎具有预期的效果。
# Connect to RabbitMQ and create a channel
sub connect {
my ($self) = @_;
$self->{pid} = $$;
# if we redefined the destructor and connect is called, we need to revert
# it so it can be redefined again properly
no warnings qw(redefine);
if($self->{original_destructor}) {
# reset original destructor
*Net::AMQP::RabbitMQ::DESTROY = $self->{original_destructor};
delete $self->{original_destructor};
}
# define alternate destructor so forked children that do not call "connect" do
# not destroy our connection
{
$self->debug("Overridding constructor...");
$self->{original_destructor} = Net::AMQP::RabbitMQ->can('DESTROY');
# only destroy the connection if the current pid is the owner's pid
my $new_destructor = sub { if($self->{pid} eq $$) { $self->debug("Destroying $_[0]!\n"); $self->{original_destructor}->(); } };
*Net::AMQP::RabbitMQ::DESTROY = $new_destructor;
}
my $connection = Net::AMQP::RabbitMQ->new();
$connection->connect('localhost', { user => $self->{username}, password => $self->{password}, port => $PORT, vhost => $VHOST });
$self->{connection} = $connection;
$self->{channel} = $self->createChannel();
1;
}
谢谢你的快速和详细的答案...我想这是这样的东西,但不知道如何验证它(谢谢你的片段)。 +1。 – Voluntari