2017-09-06 366 views
0

我正在尝试将耗时的任务提取到单独的进程。不幸的是,多线程似乎并不是PHP的选项,但您可以使用popen创建新的php进程。PHP popen进程限制?

用例如下:每分钟运行一次cronjob,检查是否有任何需要发送的电子邮件活动。可能需要同时发送多个广告系列,但截至目前,它只是每分钟收到一个广告系列。我想提取广告系列的发送到单独的流程,以便我可以同时发送多个广告系列。

的代码看起来是这样的(注意,这只是一个概念验证):

的crontab

* * * * * root /usr/local/bin/php /var/www/maintask.php 2>&1 

maintask.php

for ($i = 0; $i < 4; $i++) { 
    $processName = "Process_{$i}"; 
    echo "Spawn process {$processName}" . PHP_EOL; 

    $process = popen("php subtask.php?process_name={$processName} 2>&1", "r"); 
    stream_set_blocking($process, false); 
} 

子任务.php

$process = $_GET['process_name']; 

echo "Started sleeping process {$process}" . PHP_EOL; 
sleep(rand(10, 40)); 
echo "Stopped sleeping process {$process}" . PHP_EOL; 

现在,我遇到的问题是,popen只会在任何时候产生2个进程,而我试图产卵4.我找不出原因。似乎没有任何限制记录。也许这受到我可用内核数量的限制?

+0

我已经用[proc_open()](http://php.net/manual/en/function.proc-open.php)成功地在过去创建多个子进程。我会看看我是否可以挖掘出代码段 –

+0

你解决了你的问题吗?你怎么知道你的代码只能同时运行2个进程?你的文章中的代码不足以测试这个。 –

+0

@WeeZel不幸的是,我还没有能够解决这个问题。我通过监视活动进程检查了仅有2个正在运行的进程,使用“ps -aux”。我可以看到,只要有一个subtask.php进程完成,一个新进程就会启动。 我想你是对的提供的代码。这是一个试图简单地展示我想要实现的内容,而不会公布所有会使事情过于复杂的实际代码。我会看看我是否可以对我的问题进行演示并更新原始帖子。 谢谢! –

回答

1

我修改了subtask.php,以便您可以看到每个任务的开始时间,结束时间和打算等待的时间。现在你可以看到一个进程启动时/停止可以减少睡眠时间 - 无需使用ps -aux显示当进程正在运行

subtask.php

<?php 
$process = $argv[1]; 

$sleepTime = rand(1, 10); 
echo date('Y-m-d H:i:s') . " - Started sleeping process {$process} ({$sleepTime})" . PHP_EOL; 
sleep($sleepTime); 
echo date('Y-m-d H:i:s') . " - Stopped sleeping process {$process}" . PHP_EOL; 

我添加类到maintask.php代码,以便您可以测试它...开始从中获得乐趣,当你queue()更多的条目超过您设置maxProcesses(试行32)
注:结果会回来的顺序他们完成

maintask.php

<?php 
class ParallelProcess 
{ 
    private $maxProcesses = 16; // maximum processes 
    private $arrProcessQueue = []; 
    private $arrCommandQueue = []; 

    private function __construct() 
    { 
    } 

    private function __clone() 
    { 
    } 

    /** 
    * 
    * @return \static 
    */ 
    public static function create() 
    { 
     $result = new static(); 
     return $result; 
    } 

    /** 
    * 
    * @param int $maxProcesses 
    * @return \static 
    */ 
    public static function load($maxProcesses = 16) 
    { 
     $result = self::create(); 
     $result->setMaxProcesses($maxProcesses); 
     return $result; 
    } 

    /** 
    * get maximum processes 
    * 
    * @return int 
    */ 
    public function getMaxProcesses() 
    { 
     return $this->maxProcesses; 
    } 

    /** 
    * set maximum processes 
    * 
    * @param int $maxProcesses 
    * @return $this 
    */ 
    public function setMaxProcesses($maxProcesses) 
    { 
     $this->maxProcesses = $maxProcesses; 
     return $this; 
    } 

    /** 
    * number of entries in the process queue 
    * 
    * @return int 
    */ 
    public function processQueueLength() 
    { 
     $result = count($this->arrProcessQueue); 
     return $result; 
    } 

    /** 
    * number of entries in the command queue 
    * 
    * @return int 
    */ 
    public function commandQueueLength() 
    { 
     $result = count($this->arrCommandQueue); 
     return $result; 
    } 


    /** 
    * process open 
    * 
    * @staticvar array $arrDescriptorspec 
    * @param string $strCommand 
    * @return $this 
    * @throws \Exception 
    */ 
    private function p_open($strCommand) 
    { 
     static $arrDescriptorSpec = array(
      0 => array('file', '/dev/null', 'r'), // stdin is a file that the child will reda from 
      1 => array('pipe', 'w'), // stdout is a pipe that the child will write to 
      2 => array('file', '/dev/null', 'w') // stderr is a pipe that the child will write to 
     ); 

     $arrPipes = array(); 
     if (($resProcess = proc_open($strCommand, $arrDescriptorSpec, $arrPipes)) === false) { 
      throw new \Exception("error: proc_open() failed!"); 
     } 

     $resStream = &$arrPipes[1]; 

     if (($blnSetBlockingResult = stream_set_blocking($resStream, true)) === false) { 
      throw new \Exception("error: stream_set_blocking() failed!"); 
     } 

     $this->arrProcessQueue[] = array(&$strCommand, &$resProcess, &$resStream); 
     return $this; 
    } 

    /** 
    * execute any queued commands 
    * 
    * @return $this 
    */ 
    private function executeCommand() 
    { 
     while ($this->processQueueLength() < $this->maxProcesses and $this->commandQueueLength() > 0) { 
      $strCommand = array_shift($this->arrCommandQueue); 
      $this->p_open($strCommand); 
     } 
     return $this; 
    } 

    /** 
    * process close 
    * 
    * @param array $arrQueueEntry 
    * @return $this 
    */ 
    private function p_close(array $arrQueueEntry) 
    { 
     $resProcess = $arrQueueEntry[1]; 
     $resStream = $arrQueueEntry[2]; 

     fclose($resStream); 

     $this->returnValue = proc_close($resProcess); 

     $this->executeCommand(); 
     return $this; 
    } 

    /** 
    * queue command 
    * 
    * @param string $strCommand 
    * @return $this 
    */ 
    public function queue($strCommand) { 
     // put the command on the $arrCommandQueue 
     $this->arrCommandQueue[] = $strCommand; 
     $this->executeCommand(); 
     return $this; 
    } 

    /** 
    * read from stream 
    * 
    * @param resource $resStream 
    * @return string 
    */ 
    private static function readStream($resStream) 
    { 
     $result = ''; 
     while (($line = fgets($resStream)) !== false) { 
      $result .= $line; 
     } 
     return $result; 
    } 

    /** 
    * read a result from the process queue 
    * 
    * @return string|false 
    */ 
    private function readProcessQueue() 
    { 
     $result = false; 
     reset($this->arrProcessQueue); 
     while ($result === false && list($key, $arrQueueEntry) = each($this->arrProcessQueue)) { 
      $arrStatus = proc_get_status($arrQueueEntry[1]); 
      if ($arrStatus['running'] === false) { 
       array_splice($this->arrProcessQueue, $key, 1); 
       $resStream = $arrQueueEntry[2]; 
       $result = self::readStream($resStream); 
       $this->p_close($arrQueueEntry); 
      } 
     } 
     return $result; 
    } 

    /** 
    * get result from process queue 
    * 
    * @return string|false 
    */ 
    public function readNext() 
    { 
     $result = false; 
     if ($this->processQueueLength() === 0) { 
     } else { 
      while ($result === false and $this->processQueueLength() > 0) { 
       $result = $this->readProcessQueue(); 
      } 
     } 
     return $result; 
    } 
} 

set_time_limit(0); // don't timeout 

$objParallelProcess = ParallelProcess::load(8); // allow up to 8 parallel processes 

for ($i = 0; $i < 4; $i++) { 
    $processName = "Process_{$i}"; 
    echo date('Y-m-d H:i:s') . " - Queue process {$processName}" . PHP_EOL; 
    $objParallelProcess->queue("php subtask.php {$processName}"); // queue process 
} 

// loop through process queue 
while (($strResponse = $objParallelProcess->readNext()) !== false) { // read next result and run next command if one is queued 
    // process response 
    echo $strResponse; 
} 
+0

这是出色的工作!谢谢一堆! –