2016-04-26 53 views
0

我有一个由扫描器,解析器和队列类组成的应用程序。我试图在使用PHP pthreads的解析器类的run()方法中使用多线程。在多线程中访问复杂对象w/PHP pthreads

class Scanner { 
    public function performScan() { 
     // Add initial task 
     $initialTask = "Task 1"; 
     TaskQueue::addTask($initialTask); 

     $i = 0; 
     while(true) { 
      // Get task from queue 
      $task = TaskQueue::getTask(); 
      if ($task == null) 
       break; 

      // Handle task 
      $parser[$i] = new Parser($task); 
      $parser[$i]->start(); 

      // Join 
      $parser[$i]->join(); 
      $i++; 
     } 

     // Done 
     echo "Done\n"; 
    } 
} 

class Parser extends Thread { 
    private $task; 

    public function __construct($task) { 
     $this->task = $task; 
    } 

    public function run() { 
     // Perform a time-consuming operation 
     // This operation adds an unknown number of extra tasks 
     sleep(1); 

     // Add new tasks to queue 
     foreach(range(0, 4) as $i) { 
      TaskQueue::addTask("Task {$i}"); 
     } 
    } 
} 

class TaskQueue { 
    private static $queue = array(); 

    public static function addTask($task) { 
     self::$queue[] = $task; 
     echo "Add task to queue!\n"; 
    } 

    public static function getTask() { 
     if (sizeof(self::$queue) > 0) { 
      $task = array_shift(self::$queue); 
      echo "Get task from queue!\n"; 
      return $task; 
     } 
    } 
} 

$scanner = new Scanner(); 
$scanner->performScan(); 

预期的情况是,扫描仪继续工作,直到它出任务(在这种情况下,无限期):

Add task to queue! 
Get task from queue! 
Add task to queue! 
Add task to queue! 
Add task to queue! 
Add task to queue! 
Add task to queue! 
Get task from queue! 
Add task to queue! 
Add task to queue! 
Add task to queue! 
Add task to queue! 
Add task to queue! 
Get task from queue! 
^C 

相反,扫描仪,因为有执行第一线后停止队列中没有更多的任务:

Add task to queue! 
Get task from queue! 
Add task to queue! 
Add task to queue! 
Add task to queue! 
Add task to queue! 
Add task to queue! 
Done 

我想这是因为该线程创建任务队列的本地副本(因为它不是一个简单的对象,像一个整数或字符串)和将任务添加到该副本,而不是Scanner类使用的副本。我已阅读examples on Github,但我无法提出适当的解决方案。

更新:我修改了代码,使用wait()notify()的建议,但我还是看到了同样的结果。

class Scanner { 
    public function performScan() { 
     // Add initial task 
     $initialTask = "Task 1"; 
     TaskQueue::addTask($initialTask); 

     $i = 0; 
     while(true) { 
      // Get task from queue 
      $task = TaskQueue::getTask(); 
      if ($task == null) 
       break; 

      // Handle task 
      $parser[$i] = new Parser($task); 
      $parser[$i]->start(); 

      // Wait 
      $thread = $parser[$i]; 
      $thread->synchronized(function() use($thread) { 
       while (!$thread->awake) { 
        $thread->wait(); 
       } 
      }); 

      // Join 
      $i++; 
     } 

     // Done 
     echo "Done\n"; 
    } 
} 

class Parser extends Thread { 
    private $task; 

    public function __construct($task) { 
     $this->task = $task; 
    } 

    public function run() { 
     // Perform a time-consuming operation 
     // This operation adds an unknown number of extra tasks 
     sleep(1); 

     // Add new tasks to queue 
     foreach(range(0, 4) as $i) { 
      TaskQueue::addTask("Task {$i}"); 
     } 

     // Notify 
     $this->synchronized(function(){ 
      $this->awake = true; 
      $this->notify(); 
     }); 
    } 
} 

class TaskQueue { 
    private static $queue = array(); 

    public static function addTask($task) { 
     self::$queue[] = $task; 
     echo "Add task to queue!\n"; 
    } 

    public static function getTask() { 
     if (sizeof(self::$queue) > 0) { 
      $task = array_shift(self::$queue); 
      echo "Get task from queue!\n"; 
      return $task; 
     } 
    } 
} 

$scanner = new Scanner(); 
$scanner->performScan(); 
+0

请参阅:http://stackoverflow.com/help/mcve –

+0

@JoeWatkins对不起,更新了示例代码以实际工作。 –

回答

0

您需要使用getTask()Cond::wait(),使扫描仪等到有更多的任务,并在addTask()Cond::signal()当你插入一个新的任务,要叫醒他起来,继续采摘任务

http://php.net/manual/en/cond.wait.php

http://php.net/manual/en/cond.signal.php

另一种方法是使用从0 wait()notify()方法class

+0

我调用'$ thread-> join()'时调用wait()和notify()是不需要的,因为它确保调用线程在第二个线程完成之前等待)? –

+0

@PeterVanAkelyen'join()'用于确保线程结束/等待另一个线程同时完成(像屏障一样使用)。等待是用来停止线程,直到我们知道我们可以唤醒他 – Nadir

+0

我更新了代码以使用'wait()'和'notify()',但我仍然看到相同的结果。 –