3

我应该如何多线程一些需要超时的php-cli代码?

我从命令行在 Centos 6.6 上使用 PHP 5.6。

我对多线程术语或代码不是很熟悉。我将在这里简化代码,但它 100% 代表了我想要做的事情。

非线程代码目前看起来像这样:

$datasets = MyLibrary::getAllRawDataFromDBasArrays();
foreach ($datasets as $dataset) {
    MyLibrary::processRawDataAndStoreResultInDB($dataset);
}
exit; // just for clarity

我需要预取所有数据集,并且每个 processRawDataAndStoreResultInDB() 都无法获取它自己的数据集。有时 processRawDataAndStoreResultInDB() 处理数据集的时间太长,所以我想限制它处理它的时间。

所以你可以看到使它成为多线程的

  1. 通过允许同时执行多个 processRawDataAndStoreResultInDB() 来加速它
  2. 使用 set_time_limit() 来限制每个人处理每个数据集的时间

请注意,我不需要回到我的主程序。由于这是一种简化,您可以相信我不想收集所有已处理的数据集并在它们全部完成后将它们保存到数据库中。

我想做类似的事情:

class MyWorkerThread extends SomeThreadType {
  public function __construct($timeout, $dataset) {
    $this->timeout = $timeout;
    $this->dataset = $dataset;
  }

  public function run() {
    set_time_limit($this->timeout);
    MyLibrary::processRawDataAndStoreResultInDB($this->dataset);
  } 
}

$numberOfThreads = 4;
$pool = somePoolClass($numberOfThreads);
$pool->start();

$datasets = MyLibrary::getAllRawDataFromDBasArrays();
$timeoutForEachThread = 5; // seconds
foreach ($datasets as $dataset) {
  $thread = new MyWorkerThread($timeoutForEachThread, $dataset);

  $thread->addCallbackOnTerminated(function() {
    if ($this->isTimeout()) {
      MyLibrary::saveBadDatasetToDb($dataset);
    }
  }

  $pool->addToQueue($thread);
}

$pool->waitUntilAllWorkersAreFinished();
exit; // for clarity

从我的在线研究中,我发现了可以与线程安全的 php CLI 一起使用的 PHP 扩展 pthreads,或者我可以使用 PCNTL 扩展或围绕它的包装库(例如,Arara/Process)

但是,当我查看它们及其示例时(尤其是 pthreads 池示例),我很快就会对术语以及应该使用哪些类来实现我正在寻找的那种多线程感到困惑。

如果我在线程类上有 isRunning()、isTerminated()、getTerminationStatus() 和 execute() 函数,我什至不介意自己创建池类,因为它是一个简单的队列。

有更多经验的人可以指导我使用哪个库、类和函数来映射到上面的示例吗?我完全采取了错误的方法吗?

提前致谢。

4

1 回答 1

0

这是一个使用工作进程的示例。我正在使用 pcntl 扩展。

/**
 * Spawns a worker process and returns it pid or -1 
 * if something goes wrong.
 *
 * @param callback function, closure or method to call
 * @return integer
 */
function worker($callback) {
    $pid = pcntl_fork();
    if($pid === 0) {
        // Child process
        exit($callback());
    } else {
        // Main process or an error
        return $pid;
    }
}


$datasets = array(
    array('test', '123'),
    array('foo', 'bar')
);

$maxWorkers = 1;
$numWorkers = 0;
foreach($datasets as $dataset) {
    $pid = worker(function () use ($dataset) {
        // Do DB stuff here
        var_dump($dataset);
        return 0;
    });

    if($pid !== -1) {
        $numWorkers++;
    } else {
        // Handle fork errors here
        echo 'Failed to spawn worker';
    }

    // If $maxWorkers is reached we need to wait
    // for at least one child to return
    if($numWorkers === $maxWorkers) {
        // $status is passed by reference
        $pid = pcntl_wait($status);
        echo "child process $pid returned $status\n";
        $numWorkers--;
    }
}

// (Non blocking) wait for the remaining childs
while(true) {
    // $status is passed by reference
    $pid = pcntl_wait($status, WNOHANG);

    if(is_null($pid) || $pid === -1) {
        break;
    }

    if($pid === 0) {
        // Be patient ...
        usleep(50000);
        continue;
    }

    echo "child process $pid returned $status\n";
}
于 2015-02-13T22:58:15.013 回答