2017-02-14 93 views
1

我遇到了与Net :: AMQP :: RabbitMQ和fork()的困惑行为。如果我...发布到RabbitMQ悄悄地在父进程中失败后分叉儿童

  1. 建立在父进程
  2. 到RabbitMQ的连接发布的消息
  3. 叉一个孩子,并等待它退出(孩子睡觉)
  4. 发布消息

......第二条消息实际上并没有发送到RabbitMQ(并且没有发生错误)。我做了很多测试,包括在发送前检查$connection->is_connected()。从我的实验中获得一些有趣的花絮:

  • 如果我在发布第二条消息之前尝试打开新通道,则会挂起$connection->open_channel($newChannelId)调用。
  • 如果我允许孩子在父母发布第二条消息的同时继续运行(并且等到之后再到waitpid),那么它已成功发送。

我正在寻找一种方法来检测此连接不再有效时,分叉的孩子退出,并强制断开/重新连接。我将连接缓存在系统中各种其他模块使用的perl模块中,我不知道是否/当这些其他模块fork()并行工作。我无法可靠地设置$SIG{CHLD}处理程序,并在收到信号时断开连接,因为其他代码可能会覆盖我的处理程序。我拥有的唯一防弹选项是放弃缓存并为每封邮件建立连接,但这会显着降低发布速度(减少30倍)。

此脚本演示了这个问题(发布到称为话题交换“广播”):

#!/usr/bin/perl 
use strict; 
use Net::AMQP::RabbitMQ; 
use JSON -support_by_pp; 

my $connection; 
my $channelId = 0; 

sub sendToRabbit { 
    my ($message) = @_; 
    print "Sending message $message->{message}\n"; 
    my $contentType = 'application/json'; 
    my $payload = encode_json $message; 
    $connection->publish($channelId, 'test.route', $payload, { exchange => 'broadcast', force_utf8_in_header_strings => 1 }, { 'content_type' => $contentType }); 
    print "Sent!\n"; 
} 

sub main { 
    print "Starting...\n"; 

    $connection = Net::AMQP::RabbitMQ->new(); 
    $connection->connect('localhost', { user => 'guest', password => 'guest', port => 5672 }); 
    $connection->channel_open(++$channelId); 
    print "Connected!\n"; 

    # send first message 
    sendToRabbit({ message => 'body 1' }); 

    # fork child 
    my $child = fork(); 
    if(!$child) { 
    # child 
    sleep(1); 
    print "child exiting...\n"; 
    exit(0); 
    } 
    else { 
    # parent 
    waitpid($child, 0); 
    } 
    print "parent continuing...\n"; 

    # send second message - this will not be actually sent. 
    sendToRabbit({ message => 'body 2' }); 

    # allow I/O to settle... 
    sleep(1); 
} 

main(); 

编辑:溶液

由于池上有关解决方案的谜底!

在我的RabbitMQ管理对象中,我注入了一些代码到connect()例程中,允许我有选择地跳过分叉子的子句,这些子类本身不会调用connect()。这似乎具有预期的效果。

# Connect to RabbitMQ and create a channel 
sub connect { 
    my ($self) = @_; 

    $self->{pid} = $$; 

    # if we redefined the destructor and connect is called, we need to revert 
    # it so it can be redefined again properly 
    no warnings qw(redefine); 
    if($self->{original_destructor}) { 
    # reset original destructor 
    *Net::AMQP::RabbitMQ::DESTROY = $self->{original_destructor}; 
    delete $self->{original_destructor}; 
    } 

    # define alternate destructor so forked children that do not call "connect" do 
    # not destroy our connection 
    { 
    $self->debug("Overridding constructor..."); 
    $self->{original_destructor} = Net::AMQP::RabbitMQ->can('DESTROY'); 
    # only destroy the connection if the current pid is the owner's pid 
    my $new_destructor = sub { if($self->{pid} eq $$) { $self->debug("Destroying $_[0]!\n"); $self->{original_destructor}->(); } }; 
    *Net::AMQP::RabbitMQ::DESTROY = $new_destructor; 
    } 

    my $connection = Net::AMQP::RabbitMQ->new(); 
    $connection->connect('localhost', { user => $self->{username}, password => $self->{password}, port => $PORT, vhost => $VHOST }); 
    $self->{connection} = $connection; 
    $self->{channel} = $self->createChannel(); 

    1; 
} 

回答

2

孩子是父母的克隆,并且文件处理父母与孩子共享。作为父母的副本,孩子拥有$connection的副本。当孩子退出时,这个对象被销毁,调用它的析构函数,向RabbitMQ发送命令关闭连接。

您可以通过添加

{ 
    my $old_destructor = Net::AMQP::RabbitMQ->can('DESTROY'); 
    my $new_destructor = sub { print("Destroying $_[0]!\n"); $old_destructor->(); }; 
    no warnings qw(redefine); 
    *Net::AMQP::RabbitMQ::DESTROY = $new_destructor; 
} 

可能的解决方案看到这一点:

  • 移动子代码到一个单独的文件,并exec该文件。
  • 将子代码移动到使用“secret”参数调用脚本时调用的子代码中,并让孩子使用该参数通过exec重新启动。
  • 更快地创建孩子。具体来说,在创建RabbitMQ连接之前创建它。
  • 创建一个孩子来做RabbitMQ的东西。
  • 使用线程而不是子进程。

PS —不要写自己fork + exec代码。至少使用open3

sub spawn { 
    open(local *CHILD_STDIN, '<', '/dev/null') or die $!; 
    return open3('<&CHILD_STDIN', '>&STDOUT', '>&STDERR', @_); 
} 

sendToRabbit({ message => 'body 1' }); 

my $pid = spawn('child.pl');  
waitpid($pid, 0); 

sendToRabbit({ message => 'body 2' }); 
+0

谢谢你的快速和详细的答案...我想这是这样的东西,但不知道如何验证它(谢谢你的片段)。 +1。 – Voluntari