2017-08-30 106 views
1

我有一个使用PHP 7.0.22的pthreads v3.1.6的代码。我遇到的问题是线程不返回数组值。我的代码如下pthreads破坏数组结果

$threadCount = 1; 

$url_idList = range(1,2000); 
$url_idListChunked = array_chunk($url_idList, $threadCount); 

class WorkerThreads extends Thread { 
    private $threadName, $url_id; 

    public function __construct($threadName,$url_id) { 
     $this->threadName = $threadName; 
     $this->url_id = $url_id; 
     $this->result = []; 
    } 

    public function run() { 
     if ($this->threadName && $this->url_id) { 
      printf('%sLoading URL #: %s' . "\n", $this->threadName, $this->url_id); 
      $this->result = send_request('GET',$this->url_id,NULL,$this->threadName); 
     } 
    } 
} 

while(count($url_idListChunked)){ 
    $url_idListChunk = array_shift($url_idListChunked); 
    $workers = []; 
    foreach (range(0,count($url_idListChunk)-1) as $i) { 
     $threadName = "Thread #".$i.": "; 
     $workers[$i] = new WorkerThreads($threadName,$url_idListChunk[$i]); 
     $workers[$i]->start(); 
    } 

    foreach (range(0,count($url_idListChunk)-1) as $i) { 
     $workers[$i]->join(); 
     print_r($workers[$i]); 
     exit(); 
     echo $workers[$i]['threadName']."Result for URL #: ".$workers[$i]['url_id']."\n"; 
    } 

} 

function send_request($method,$url_id,$data,$threadName=NULL){ 

    $url = 'https://www.example.tld/?id='.$url_id; 

    $ch = curl_init(); 
    curl_setopt($ch, CURLOPT_URL, $url); 
    curl_setopt($ch, CURLOPT_HEADER, TRUE); 
    curl_setopt($ch, CURLOPT_RETURNTRANSFER, TRUE); 
    curl_setopt($ch, CURLOPT_FOLLOWLOCATION, TRUE); 
    curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, FALSE); 
    curl_setopt($ch, CURLOPT_TIMEOUT, 3); 
    if(!$data && $method=='POST'){ 
     $data = generate_post_data(); 
     curl_setopt($ch, CURLOPT_POST, 1); 
     curl_setopt($ch, CURLOPT_POSTFIELDS, $data); 
    } 
    $response = curl_exec($ch); 
    while((curl_errno($ch) == 6 OR curl_errno($ch) == 28)){ 
     $response = curl_exec($ch); 
     echo $threadName.'Curl error #'.curl_errno($ch).' - ' . curl_error($ch)." Retrying.\n"; 
     sleep(2);  
    } 

    curl_close($ch); 

    $result['data'] = $response; 

    return $result; 

} 

当我尝试print_t($workers)我收到以下错误消息Uncaught RuntimeException: pthreads detected an attempt to connect to an object which has already been destroyed。为什么我会失去数组结果?看起来线程没有问题将字符串传回。

回答

0

你好,我不完全确定使用start-> join方法是你想要实现的首选方法吗?我认为你需要使用pthreads中的pool collectable方法。

这是一个例子,可能会激发你的工作,如果你想从分块批次收集结果。我亲自使用它,这是推动pthreads进入极限的最快方法。小心不要通过CPU线程推送池号(在这个例子中它是10个核心)。

如果我可以说关于你的代码,不要试图从pthreads worker输出屏幕上的东西,它肯定会乱七八糟。返回对象并在集合上回显它。确保你的结果在你的课堂上已经公开允许返回的对象。

更不用说多卷曲,它可能是最快和最恰当的方式。

/* pthreads batches */ 
$batches = array(); 

$nbpool = 20; // cpu 10 cores 

/* job 1 */ 
$list = [/* data1 */]; 
$url_idList[] = array_chunk($list, 5000); 

/* job 2 */ 
$list2 = [/* data2 */]; 
$url_idList[] = array_chunk($list, 10000); 

/* final collected results */ 
$resultFinal = []; 

/* loop across batches */ 
foreach ($url_idList as $key => $url_idListChunked) { 

    $url_idListChunk = array_shift($url_idListChunked); 

    /* for intermediate collection */ 
    $data[$key] = []; 

    /* how many workers */ 
    $workCount = count($url_idListChunk); 

    /* set pool job up to max cpu capabilities */ 
    $pool = new Pool($nbpool, Worker::class); 

    /* pool cycling submit */ 
    foreach (range(1, $workCount) as $i) { 
     $chunck = $url_idListChunk[$i - 1]; 
     $pool->submit(new WorkerThreads(($i - 1), $chunck)); 
    } 

    /* on collection cycling */ 
    $collector = function (\Collectable $work) use (&$data) { 

     /* is worker complete ? */ 
     $isGarbage = $work->isGarbage(); 

     /* worker complete */ 
     if ($isGarbage) { 
      $result = $work->result; 
      $info = $work->info; 
      $data[$key] = $result; 

      /* echo result info outside worker */ 
      echo($info); 
     } 
     return $isGarbage; 
    }; 
    do { 
     /* collection on pool stack */ 
     $count = $pool->collect($collector); 
     $isComplete = count($data) === $workCount; 
    } while (!$isComplete); 

    /* push stack results */ 
    array_push($resultFinal, $data); 

    /* close pool */ 
    $pool->shutdown(); 
} 

class WorkerThreads extends \Threaded implements \Collectable { 

    private $url_id; 
    private $isGarbage; 
    private $threadName; 
    public $result; 
    public $info; 

    public function __construct($i, $url_id) { 
     $this->threadName = "Thread #" . $i . ": "; 
     $this->url_id = $url_id; 
    } 

    public function run() { 
     if ($this->threadName && $this->url_id) { 
      $this->result = send_request('GET', $this->url_id, NULL, $this->threadName); 
     } 
     $this->info = $this->threadName . " Result for URL #: " . $this->url_id; 
     $this->isGarbage = true; // yeah, it s done 
    } 

    public function isGarbage(): bool { 
     return $this->isGarbage; 
    } 

} 
+0

首先,感谢您的回复。即使您提供了评论,我仍然无法理解代码。我尝试运行它,并行'$池 - >提交(新的WorkerThreads(($ i - 1),$ chunck));'是抛出和错误,说'Class'WorkerThreads'找不到'。另外,我还没有达到在$ list数组中有两个作业的目的。 –

+0

您首先需要将pthreads dll添加到php.ini以使pthreads正常工作。它只能在cli模式下工作,因为你可能知道,所以你需要禁用DLL的xampp工作。 Vis等等......你可以添加任意数量的$ list_array,甚至可以为每个批次添加池的数量。这是一个简单的方法来跨批次同步wotk,并避免使用棘手的pthread syncro方法。 – tryHarder

+0

pthreads.dll安装并正常工作。它找不到WorkerThreads类,而不是实际的Threaded类。 –