我有一个由扫描器,解析器和队列类组成的应用程序。我试图在使用PHP pthreads的解析器类的run()
方法中使用多线程。在多线程中访问复杂对象w/PHP pthreads
class Scanner {
public function performScan() {
// Add initial task
$initialTask = "Task 1";
TaskQueue::addTask($initialTask);
$i = 0;
while(true) {
// Get task from queue
$task = TaskQueue::getTask();
if ($task == null)
break;
// Handle task
$parser[$i] = new Parser($task);
$parser[$i]->start();
// Join
$parser[$i]->join();
$i++;
}
// Done
echo "Done\n";
}
}
class Parser extends Thread {
private $task;
public function __construct($task) {
$this->task = $task;
}
public function run() {
// Perform a time-consuming operation
// This operation adds an unknown number of extra tasks
sleep(1);
// Add new tasks to queue
foreach(range(0, 4) as $i) {
TaskQueue::addTask("Task {$i}");
}
}
}
class TaskQueue {
private static $queue = array();
public static function addTask($task) {
self::$queue[] = $task;
echo "Add task to queue!\n";
}
public static function getTask() {
if (sizeof(self::$queue) > 0) {
$task = array_shift(self::$queue);
echo "Get task from queue!\n";
return $task;
}
}
}
$scanner = new Scanner();
$scanner->performScan();
预期的情况是,扫描仪继续工作,直到它出任务(在这种情况下,无限期):
Add task to queue!
Get task from queue!
Add task to queue!
Add task to queue!
Add task to queue!
Add task to queue!
Add task to queue!
Get task from queue!
Add task to queue!
Add task to queue!
Add task to queue!
Add task to queue!
Add task to queue!
Get task from queue!
^C
相反,扫描仪,因为有执行第一线后停止队列中没有更多的任务:
Add task to queue!
Get task from queue!
Add task to queue!
Add task to queue!
Add task to queue!
Add task to queue!
Add task to queue!
Done
我想这是因为该线程创建任务队列的本地副本(因为它不是一个简单的对象,像一个整数或字符串)和将任务添加到该副本,而不是Scanner类使用的副本。我已阅读examples on Github,但我无法提出适当的解决方案。
更新:我修改了代码,使用wait()
和notify()
的建议,但我还是看到了同样的结果。
class Scanner {
public function performScan() {
// Add initial task
$initialTask = "Task 1";
TaskQueue::addTask($initialTask);
$i = 0;
while(true) {
// Get task from queue
$task = TaskQueue::getTask();
if ($task == null)
break;
// Handle task
$parser[$i] = new Parser($task);
$parser[$i]->start();
// Wait
$thread = $parser[$i];
$thread->synchronized(function() use($thread) {
while (!$thread->awake) {
$thread->wait();
}
});
// Join
$i++;
}
// Done
echo "Done\n";
}
}
class Parser extends Thread {
private $task;
public function __construct($task) {
$this->task = $task;
}
public function run() {
// Perform a time-consuming operation
// This operation adds an unknown number of extra tasks
sleep(1);
// Add new tasks to queue
foreach(range(0, 4) as $i) {
TaskQueue::addTask("Task {$i}");
}
// Notify
$this->synchronized(function(){
$this->awake = true;
$this->notify();
});
}
}
class TaskQueue {
private static $queue = array();
public static function addTask($task) {
self::$queue[] = $task;
echo "Add task to queue!\n";
}
public static function getTask() {
if (sizeof(self::$queue) > 0) {
$task = array_shift(self::$queue);
echo "Get task from queue!\n";
return $task;
}
}
}
$scanner = new Scanner();
$scanner->performScan();
请参阅:http://stackoverflow.com/help/mcve –
@JoeWatkins对不起,更新了示例代码以实际工作。 –