2017-02-23 67 views
0

我有一个3 beanstalkd队列进程运行在相同的ip但不同的端口上。我有一个单独的服务器运行主管并行产生php工作(每个beanstalkd端口20)来处理队列。我的问题是,似乎两个进程可以在同一台服务器上同时保留相同的作业ID。Pheanstalk保留相同的beanstalkd作业两次或更多

下面是我的一些日志输出样本:

2017-02-23 09:59:56 --> START JOB (port: 11301 | u: 0.45138600 1487861996 | jid:1695074 | pid:30019 | j:leads_to_tags_add | tr:1) 
2017-02-23 09:59:57 --> START JOB (port: 11301 | u: 0.55024800 1487861997 | jid:1695074 | pid:30157 | j:leads_to_tags_add | tr:2) 
2017-02-23 09:59:58 --> DEL JOB (port: 11301 | u: 0.54731000 1487861998 | jid:1695074 | pid:30019 | j:leads_to_tags_add) 
2017-02-23 09:59:58 --> DEL JOB (port: 11301 | u: 0.58927900 1487861998 | jid:1695074 | pid:30157 | j:leads_to_tags_add) 

似乎右后两个相互储量发生,第一个进程完成之前,第2贮存发生,删除作业。

我在每个jobid的redis中添加了一个计数器,很明显,当它保留第二次时,计数器上升一次(tr)。 TTRR设置为3600,因此在第一个过程完成之前,它无法到期。

这是什么工作状态看起来像右边的第二个过程后备用:

Pheanstalk\Response\ArrayResponse::__set_state(array(
    'id' => '1695074', 
    'tube' => 'action-medium', 
    'state' => 'reserved', 
    'pri' => '0', 
    'age' => '1', 
    'delay' => '0', 
    'ttr' => '3600', 
    'time-left' => '3599', 
    'file' => '385', 
    'reserves' => '2', 
    'timeouts' => '0', 
    'releases' => '0', 
    'buries' => '0', 
    'kicks' => '0', 
)) 

这种行为是很随意的,有时只有一个进程将能够保留,直到作业锁,有时2,有时甚至4个或更多(很少)。当然,这会造成执行次数不一致的重复作业。

短版本的代码:

$this->job = $this->pheanstalk->watch($tube)->reserve($timeout); 


set_error_handler(function($errno, $errstr, $errfile, $errline, array $errcontext) { 
    // error was suppressed with the @-operator 
    if (0 === error_reporting()) { 
     return false; 
    } 

    throw new ErrorException($errstr, 0, $errno, $errfile, $errline); 
}); 

$this->log_message('info', __METHOD__ . ": START JOB (" . $this->_logDetails() . " | tr:{$tries})"); 


if ($this->_process_job()) { 
    $this->log_message('info', __METHOD__ . ": FINISHED JOB (" . $this->_logDetails() . ")"); 
    $this->_delete_job(); 
} else { 
    $this->log_message('error', __METHOD__ . ": FAILED JOB (" . $this->_logDetails() . ")"); 
} 

restore_error_handler(); 

protected function _delete_job() 
{ 
    $this->pheanstalk->delete($this->job); 
    $this->log_message('info', __METHOD__ . ": DELETED JOB (" . $this->_logDetails() . ")"); 
} 
+0

请将您的代码发布到“预定”作业,发送给作品,然后在完成时删除它。 –

+0

@AlisterBulman更新了一些代码 – pierdevara

+0

只有一个Beanstalkd服务器时问题是否仍然存在?所有的服务器都有不同的二进制日志等配置,为什么还要使用多个配置? –

回答

0

的问题是一个TCP连接的我的代码process_job位之内发生的事情。 pheanstalk对象将被覆盖在执行链的某个地方。

我有一个库包装pheanstalk在发送put命令之前检查可用的最空的beanstalkd服务器。然后,它会创建一个具有最佳服务器细节的新Pheanstalk实例,并将其保存为当前连接,然后调度该命令。

有时工作人员可以发送一个子作业到队列中。该库最初将加载到worker中以获取作业,并再次在process_job中发送到队列中。由于Codeigniter中的依赖注入,该库会引用同一个对象,并且在process_job内部,它将覆盖worker的当前连接并导致TCP断开连接。

相关问题