1

Pool在 PHP 中使用对象pthread,并制作了以下测试脚本,以查看池应该如何工作。我认为,池化应该做的是获得给定数量的任务,打开最大x数量的工人,并分配给他们任务,一旦工人完成任务,如果有更多任务可用,分配给那个工人一个新的任务。

给定以下示例和上述假设:

class Work extends Threaded {
    public $id;

    public function __construct($id) {
        $this->id = $id;
    }

    public function run() {
        if ($this->id == 0) {
            sleep(3);
            echo $this->id . " is ready\n";
            return;
        } else {
            echo $this->id . " is ready\n";
            return;
        }
    }
}

$pool = new Pool(2, 'Worker', []);
for ($i=0; $i<4; $i++) $pool->submit(new Work($i));
while ($pool->collect());
$pool->shutdown();

我期待这个脚本输出以下信息:

1 准备好了
2 准备好了
3 准备好了
0 准备好了

因为,基本上有 2 个工人可用,并且由于sleep第一个工人偶然发现的操作,任务 1,2,3 必须由第二个工人完成。

而不是这个,我得到的输出是:

1 准备好了
3 准备好了
0 准备好了
2 准备好了

很明显,工人 1 在开始时被分配了作业 0 和作业 2,因此工人 2 在完成作业 1 和作业 3 后只是等待,而不是从工人 1 那里接管作业 2。

这是一个错误吗?还是打算以这种方式工作?

我的 PHP 版本:

PHP 7.2.14 (cli) (built: Jan  9 2019 22:23:26) ( ZTS MSVC15 (Visual C++ 2017) x64 )
Copyright (c) 1997-2018 The PHP Group
Zend Engine v3.2.0, Copyright (c) 1998-2018 Zend Technologies
4

3 回答 3

0

由于某种原因,我的 Docker 现在已经将 Windows 更新到 1809,所以我的 Docker 已经崩溃了,所以发布未经测试。(很抱歉,没有输出给atm)


修改了我在带有您的计数器 + 睡眠的项目中使用的现有代码。

$pool = new Pool(2);
foreach ([0,1,2,3] as $count) {
    $pool->submit(
        new class ($count) extends Threaded
        {
            private $count;

            public function __construct(int $count)
            {
                $this->count= $count;
            }

            public function run()
            {
                if ($this->count== 0) {
                    sleep(3);
                    echo $this->count . " is ready\n";
                } else {
                    echo $this->count . " is ready\n";
                }
            }
        }
    );
}

while ($pool->collect());

$pool->shutdown();

我使用匿名类 ( new class ($count) extends Threaded) 作为submit()参数。

在服务器上运行完美,使用在 Alpine 3.8 上运行 PHP ZTS 7.2.13 的 Docker 实例

于 2019-01-17T08:19:40.450 回答
0

让我回答:根据我对 php 中 pthreads 的了解,池就像可以同时运行的处理 php.exe 的数量。

因此,在您的情况下,您可以使用定义两个池new Pool(2, 'Worker', []);

因此,让我们对其进行抽象解释。有 2 个池,称为PoolAPoolB

从 0 到 3 循环,每个循环都向 Pool 提交任务。

从 0 到 3 有 4 个任务,我们用task0, task1, task2,来调用它们task3

当循环发生时,从我的角度来看,它应该是这样的队列

PoolA -> submit task0
PoolB -> submit task1
PoolA -> submit task2
PoolB -> submit task3

但从class Work那将是task0,......直到task3。

情况/条件

当参数(在这种情况下来自构造函数的 $id)为 0 时,您在 run() => 中定义一些逻辑,然后sleep(3).

从这种情况来看,PoolA哪个提交task0包含参数($id)的值为 0,PoolA将等待 3 秒。PoolA也提交task2

另一方面,PoolB提交task1and task3,从这种情况来看,不需要等待 3 秒。

因此,当while($pool->collect());运行时,最有可能发生的可能队列

task1    (PoolB)
task3    (PoolB)
task0    (PoolA)  ->>>> PoolA delayed because from task0 needs to sleep for 3 seconds
task2    (PoolA)

所以我认为输出是正确的

1 准备好了
3 准备好了
0 准备好了
2 准备好了

有一个问题。

为什么只有 PoolA 延迟了,即使 PoolA 延迟了为什么 task2 没有提交给 PoolB 或者为什么 task1 或 task3 没有提交给 PoolA?

嗯,我也不懂。我有与你类似的任务,经过多次实验,我不确定pthreads使用Pool & Threaded的是multi-threading or multiprocessing.

于 2019-03-20T10:53:22.630 回答
0

来自各个线程的回声可能具有欺骗性。我经常发现它们似乎在被调用之前就已经执行了。我建议避免从内部线程回显,除非您不关心顺序,因为它对于测试特定情况等仍然有用。

下面是一些代码,可以解决代码执行时间的任何问题,因为此代码按结果执行的实际时间对结果进行排序。(这也是如何从线程池取回结果的一个很好的例子。)

<?php
class Work extends Threaded {
    public $id;
    public $data;
    private $complete = false;
    public function __construct($id) {
        $this->id = $id;
    }

    public function run() {
        $temp = array();
        if ($this->id == 0) {
            echo "<pre>".$this->id . " started (from inside threaded)";
            $temp[] = array(microtime(true), $this->id . " started");
            sleep(3);
        }
        echo "<pre>".$this->id . " is ready (from inside threaded)";
        $temp[] = array(microtime(true), $this->id . " is ready");
        $this->data = (array) $temp; // note: it's important to cast as array, otherwise you will get a volitile
        $this->complete = true;
    }

    public function isDone() {
        return $this->complete;
    }
}

// we create a custom pool, to pass on our results
class ExamplePool extends Pool {
    public $dataAr = array(); // used to return data after we're done
    private $numTasks = 0; // counter used to know when we're done
    private $numCompleted = 0; // keep track of how many threads finished
    /**
     * override the submit function from the parent
     * to keep track of our jobs
     */
    public function submit(Threaded $task) {
        $this->numTasks++;
        parent::submit($task);
    }
    /**
     * used to wait until all workers are done
     */
    public function process() {
        // Run this loop as long as we have
        // jobs in the pool
        while ($this->numCompleted < $this->numTasks) {
            $this->collect(function (Work $task) {
                // If a task was marked as done, collect its results
                if ($task->isDone()) {
                    //this is how you get your completed data back out [accessed by $pool->process()]
                    $this->dataAr = array_merge($this->dataAr, $task->data);
                    $this->numCompleted++;
                }
                return $task->isDone();
            });
        }
        // All jobs are done
        // we can shutdown the pool
        $this->shutdown();
        return $this->dataAr;
    }
}

$pool = new ExamplePool(4);
for($i=0; $i<4; $i++) { 
    $pool->submit(new Work($i));
}
$retArr = $pool->process();
usort($retArr, 'sortResultsByTime'); // sort the results by time

// echo out the sorted results
echo "<br><br>";
for($i=0;$i<count($retArr);$i++){
    echo number_format($retArr[$i][0], 4, ".", "").' '.$retArr[$i][1]."\n";
}

function sortResultsByTime($a, $b) {
    return $a[0] > $b[0];
}
?>

请注意上面的代码为我产生了这个:

0 started (from inside threaded)
0 is ready (from inside threaded)
1 is ready (from inside threaded)
2 is ready (from inside threaded)
3 is ready (from inside threaded)

1609458117.8764 0 started
1609458117.8776 1 is ready
1609458117.8789 2 is ready
1609458117.8802 3 is ready
1609458120.8765 0 is ready

正如预期的那样,从线程内部回显的东西看起来很奇怪,但是如果您存储结果,并按执行时间对它们进行排序,您可以看到它按预期运行。

于 2021-01-01T00:00:11.187 回答