2

我目前有一个用 PHP 编写的网站,利用 curl_multi 轮询外部 API。服务器将子进程从 Web 请求中独立出来,并且运行良好,但它在某种程度上仅限于每个进程的基础。

有时它会遇到带宽瓶颈,需要更好的集中排队逻辑。

我目前正在尝试使用独立后台进程的 PHP IPC 来处理所有传出请求,但被困在通常所说的休闲程序员不太可能满足的事情上。说,垃圾收集,进程间异常处理,请求-响应匹配......等等。我走错路了吗?

那里是否有一种常见的做法(实施理论),甚至是我可以使用的库?

编辑

使用 localhost TCP/IP 通信会使本地流量的压力加倍,这绝对不是一个好方法。

我目前正在使用一些自制协议处理 IPC 消息队列......看起来完全不正确。将不胜感激任何帮助。

4

3 回答 3

11

这里有几个不同的东西需要区分:

  • 工作:你有 N 个工作要处理。执行的任务可能会崩溃或挂起,无论如何,所有作业都应该在没有任何数据丢失的情况下继续进行。

  • 资源:您正在单台机器和/或单个连接中处理您的工作,因此您需要注意您的 CPU 和带宽。

  • 同步:如果你的进程之间有交互,你需要共享信息,处理并发数据访问。


保持对资源的控制

每个人都想上公共汽车...

每个人都想上公共汽车...

因为 PHP 没有内置线程,所以我们需要模拟互斥锁。原理很简单:

1所有作业都放在一个队列中

2有 N 个可用资源,池中没有更多资源

3我们迭代队列(while在每个作业上)

4在执行之前,作业请求池中的资源

5如果有可用资源,则执行作业

6如果没有更多资源,池将挂起,直到作业完成或被视为死亡

互斥锁是如何工作的?

如何在 PHP 中做到这一点?

要继续,我们有几种可能性,但原理是一样的:

我们有 2 个程序:

  • 有一个进程启动器将同时启动不超过 N 个任务。
  • 有一个子进程,它代表一个thread's context

如何看一个进程启动器?

进程启动器知道应该运行多少任务,并在不关心结果的情况下运行它们。它只控制它们的执行(一个进程启动、完成或挂起,并且 N 已经在运行)。

PHP我在这里给你想法,稍后我会给你有用的例子:

<?php
// launcher.php

require_once("ProcessesPool.php");

// The label identifies your process pool, it should be unique for your process launcher and your process children
$multi = new ProcessesPool($label = 'test');

// Initialize a new pool (creates the right directory or file, cleans a database or whatever you want)
// 10 is the maximum number of simultaneously run processes
if ($multi->create($max = '10') == false)
{
    echo "Pool creation failed ...\n";
    exit();
}

// We need to launch N processes, stored in $count
$count = 100; // maybe count($jobs)

// We execute all process, one by one
for ($i = 0; ($i < $count); $i++)
{
    // The waitForResources method looks for how many processes are already run,
    // and hangs until a resource is free or the maximum execution time is reached.
    $ret = $multi->waitForResource($timeout = 10, $interval = 500000);
    if ($ret)
    {
        // A resource is free, so we can run a new process
        echo "Execute new process: $i\n";
        exec("/usr/bin/php ./child.php $i > /dev/null &");
    }
    else
    {
        // Timeout is reached, we consider all children as dead and we  kill them.
        echo "WaitForResources Timeout! Killing zombies...\n";
        $multi->killAllResources();
        break;
    }
}

// All process has been executed, but this does not mean they finished their work.
// This is important to follow the last executed processes to avoid zombies.
$ret = $multi->waitForTheEnd($timeout = 10, $interval = 500000);
if ($ret == false)
{
    echo "WaitForTheEnd Timeout! Killing zombies...\n";
    $multi->killAllResources();
}

// We destroy the process pool because we run all processes.
$multi->destroy();
echo "Finish.\n";

那么进程子进程,模拟线程的上下文呢?

一个孩子(已完成的工作)只有三件事要做:

  • 告诉它启动的进程启动器
  • 做他的工作
  • 告诉进程启动器它完成了

PHP它可能包含如下内容:

<?php
// child.php

require_once("ProcessesPool.php");

// we create the *same* instance of the process pool
$multi = new ProcessesPool($label = 'test');

// child tells the launcher it started (there will be one more resource busy in pool)
$multi->start();

// here I simulate job's execution
sleep(rand() % 5 + 1);

// child tells the launcher it finished his job (there will be one more resource free in pool)
$multi->finish();

您的使用示例很好,但是ProcessPool类在哪里?

有很多方法可以同步任务,但这实际上取决于您的要求和约束。

您可以使用以下方式同步您的任务:

  • 一个独特的文件
  • 一个数据库
  • 一个目录和几个文件
  • 可能是其他方法(例如系统 IPC)

正如我们已经看到的,我们至少需要 7 种方法:

1 create()将创建一个空池

2 start()在池中获取资源

3 finish()释放资源

4 waitForResources()如果没有更多可用资源,则挂起

5 killAllResources()获取池中所有已启动的作业并杀死它们

6 waitForTheEnd()挂起,直到没有更多繁忙的资源

7 destroy()破坏池

因此,让我们从创建一个抽象类开始,稍后我们将能够使用上述方式来实现它。

PHPAbstractProcessPool.php

<?php

// AbstractProcessPool.php

abstract class AbstractProcessesPool
{

    abstract protected function _createPool();

    abstract protected function _cleanPool();

    abstract protected function _destroyPool();

    abstract protected function _getPoolAge();

    abstract protected function _countPid();

    abstract protected function _addPid($pid);

    abstract protected function _removePid($pid);

    abstract protected function _getPidList();

    protected $_label;
    protected $_max;
    protected $_pid;

    public function __construct($label)
    {
        $this->_max = 0;
        $this->_label = $label;
        $this->_pid = getmypid();
    }

    public function getLabel()
    {
        return ($this->_label);
    }

    public function create($max = 20)
    {
        $this->_max = $max;
        $ret = $this->_createPool();
        return $ret;
    }

    public function destroy()
    {
        $ret = $this->_destroyPool();
        return $ret;
    }

    public function waitForResource($timeout = 120, $interval = 500000, $callback = null)
    {
        // let enough time for children to take a resource
        usleep(200000);
        while (44000)
        {
            if (($callback != null) && (is_callable($callback)))
            {
                call_user_func($callback, $this);
            }
            $age = $this->_getPoolAge();
            if ($age == -1)
            {
                return false;
            }
            if ($age > $timeout)
            {
                return false;
            }
            $count = $this->_countPid();
            if ($count == -1)
            {
                return false;
            }
            if ($count < $this->_max)
            {
                break;
            }
            usleep($interval);
        }
        return true;
    }

    public function waitForTheEnd($timeout = 3600, $interval = 500000, $callback = null)
    {
        // let enough time to the last child to take a resource
        usleep(200000);
        while (44000)
        {
            if (($callback != null) && (is_callable($callback)))
            {
                call_user_func($callback, $this);
            }
            $age = $this->_getPoolAge();
            if ($age == -1)
            {
                return false;
            }
            if ($age > $timeout)
            {
                return false;
            }
            $count = $this->_countPid();
            if ($count == -1)
            {
                return false;
            }
            if ($count == 0)
            {
                break;
            }
            usleep($interval);
        }
        return true;
    }

    public function start()
    {
        $ret = $this->_addPid($this->_pid);
        return $ret;
    }

    public function finish()
    {
        $ret = $this->_removePid($this->_pid);
        return $ret;
    }

    public function killAllResources($code = 9)
    {
        $pids = $this->_getPidList();
        if ($pids == false)
        {
            $this->_cleanPool();
            return false;
        }
        foreach ($pids as $pid)
        {
            $pid = intval($pid);
            posix_kill($pid, $code);
            if ($this->_removePid($pid) == false)
            {
                return false;
            }
        }
        return true;
    }

}

使用目录和多个文件进行同步

如果您想使用目录方法(/dev/ram1例如在分区上),实现将是:

1 create()将使用给定的创建一个空目录$label

2 start()在目录中创建一个文件,由孩子的 pid 命名

3 finish()销毁孩子的档案

4 waitForResources()计算该目录中的文件

5 killAllResources()读取目录内容并杀死所有 pid

6 waitForTheEnd()读取目录直到没有更多文件

7 destroy()删除目录

这种方法看起来很昂贵,但如果您想同时运行数百个任务而不需要与要执行的作业一样多的数据库连接,那么它真的很有效。

实施

PHPProcessPoolFiles.php

<?php

// ProcessPoolFiles.php

class ProcessesPoolFiles extends AbstractProcessesPool
{

    protected $_dir;

    public function __construct($label, $dir)
    {
        parent::__construct($label);
        if ((!is_dir($dir)) || (!is_writable($dir)))
        {
            throw new Exception("Directory '{$dir}' does not exist or is not writable.");
        }
        $sha1 = sha1($label);
        $this->_dir = "{$dir}/pool_{$sha1}";
    }

    protected function _createPool()
    {
        if ((!is_dir($this->_dir)) && (!mkdir($this->_dir, 0777)))
        {
            throw new Exception("Could not create '{$this->_dir}'");
        }
        if ($this->_cleanPool() == false)
        {
            return false;
        }
        return true;
    }

    protected function _cleanPool()
    {
        $dh = opendir($this->_dir);
        if ($dh == false)
        {
            return false;
        }
        while (($file = readdir($dh)) !== false)
        {
            if (($file != '.') && ($file != '..'))
            {
                if (unlink($this->_dir . '/' . $file) == false)
                {
                    return false;
                }
            }
        }
        closedir($dh);
        return true;
    }

    protected function _destroyPool()
    {
        if ($this->_cleanPool() == false)
        {
            return false;
        }
        if (!rmdir($this->_dir))
        {
            return false;
        }
        return true;
    }

    protected function _getPoolAge()
    {
        $age = -1;
        $count = 0;
        $dh = opendir($this->_dir);
        if ($dh == false)
        {
            return false;
        }
        while (($file = readdir($dh)) !== false)
        {
            if (($file != '.') && ($file != '..'))
            {
                $stat = @stat($this->_dir . '/' . $file);
                if ($stat['mtime'] > $age)
                {
                    $age = $stat['mtime'];
                }
                $count++;
            }
        }
        closedir($dh);
        clearstatcache();
        return (($count > 0) ? (@time() - $age) : (0));
    }

    protected function _countPid()
    {
        $count = 0;
        $dh = opendir($this->_dir);
        if ($dh == false)
        {
            return -1;
        }
        while (($file = readdir($dh)) !== false)
        {
            if (($file != '.') && ($file != '..'))
            {
                $count++;
            }
        }
        closedir($dh);
        return $count;
    }

    protected function _addPid($pid)
    {
        $file = $this->_dir . "/" . $pid;
        if (is_file($file))
        {
            return true;
        }
        echo "{$file}\n";
        $file = fopen($file, 'w');
        if ($file == false)
        {
            return false;
        }
        fclose($file);
        return true;
    }

    protected function _removePid($pid)
    {
        $file = $this->_dir . "/" . $pid;
        if (!is_file($file))
        {
            return true;
        }
        if (unlink($file) == false)
        {
            return false;
        }
        return true;
    }

    protected function _getPidList()
    {
        $array = array ();
        $dh = opendir($this->_dir);
        if ($dh == false)
        {
            return false;
        }
        while (($file = readdir($dh)) !== false)
        {
            if (($file != '.') && ($file != '..'))
            {
                $array[] = $file;
            }
        }
        closedir($dh);
        return $array;
    }

}

PHP demo,进程启动器:

<?php

// pool_files_launcher.php

require_once("AbstractProcessesPool.php");
require_once("ProcessesPoolFiles.php");

$multi = new ProcessesPoolFiles($label = 'test', $dir = "/tmp");

if ($multi->create($max = '10') == false)
{
    echo "Pool creation failed ...\n";
    exit();
}

$count = 20;

for ($i = 0; ($i < $count); $i++)
{
    $ret = $multi->waitForResource($timeout = 10, $interval = 500000, 'test_waitForResource');
    if ($ret)
    {
        echo "Execute new process: $i\n";
        exec("/usr/bin/php ./pool_files_calc.php $i > /dev/null &");
    }
    else
    {
        echo "WaitForResources Timeout! Killing zombies...\n";
        $multi->killAllResources();
        break;
    }
}

$ret = $multi->waitForTheEnd($timeout = 10, $interval = 500000, 'test_waitForTheEnd');
if ($ret == false)
{
    echo "WaitForTheEnd Timeout! Killing zombies...\n";
    $multi->killAllResources();
}

$multi->destroy();
echo "Finish.\n";

function test_waitForResource($multi)
{
    echo "Waiting for available resource ( {$multi->getLabel()} )...\n";
}

function test_waitForTheEnd($multi)
{
    echo "Waiting for all resources to finish ( {$multi->getLabel()} )...\n";
}

PHP demo,子进程:

<?php

// pool_files_calc.php

require_once("AbstractProcessesPool.php");
require_once("ProcessesPoolFiles.php");

$multi = new ProcessesPoolFiles($label = 'test', $dir = "/tmp");

$multi->start();

// here I simulate job's execution
sleep(rand() % 7 + 1);

$multi->finish();

使用数据库同步

MySQL如果您更喜欢使用数据库方法,则需要一个类似的表:

CREATE TABLE `processes_pool` (
  `label` varchar(40) PRIMARY KEY,
  `nb_launched` mediumint(6) unsigned NOT NULL,
  `pid_list` varchar(2048) default NULL,
  `updated` timestamp NOT NULL default CURRENT_TIMESTAMP on update CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

然后,实现将类似于:

1 create()将在上表中插入一个新行

2 start()在 pid 列表中插入一个 pid

3 finish()从 pid 列表中删除一个 pid

4 waitForResources()读取 nb_launched 字段

5 killAllResources()获取并杀死每个 pid

6 waitForTheEnd()挂起并定期检查,直到 nb_launched 等于 0

7 destroy()删除行

实施

PHPProcessPoolMySql.php

<?php

// ProcessPoolMysql.php

class ProcessesPoolMySQL extends AbstractProcessesPool
{

    protected $_sql;

    public function __construct($label, PDO $sql)
    {
        parent::__construct($label);
        $this->_sql = $sql;
        $this->_label = sha1($label);
    }

    protected function _createPool()
    {
        $request = "
            INSERT IGNORE INTO processes_pool
            VALUES ( ?, ?, NULL, CURRENT_TIMESTAMP )
        ";
        $this->_query($request, $this->_label, 0);
        return $this->_cleanPool();
    }

    protected function _cleanPool()
    {
        $request = "
            UPDATE processes_pool
            SET
                nb_launched = ?,
                pid_list = NULL,
                updated = CURRENT_TIMESTAMP
            WHERE label = ?
        ";
        $this->_query($request, 0, $this->_label);
        return true;
    }

    protected function _destroyPool()
    {
        $request = "
            DELETE FROM processes_pool
            WHERE label = ?
        ";
        $this->_query($request, $this->_label);
        return true;
    }

    protected function _getPoolAge()
    {
        $request = "
            SELECT (CURRENT_TIMESTAMP - updated) AS age
            FROM processes_pool
            WHERE label = ?
         ";
        $ret = $this->_query($request, $this->_label);
        if ($ret === null)
        {
            return -1;
        }
        return $ret['age'];
    }

    protected function _countPid()
    {
        $req = "
            SELECT nb_launched AS nb
            FROM processes_pool
            WHERE label = ?
        ";
        $ret = $this->_query($req, $this->_label);
        if ($ret === null)
        {
            return -1;
        }
        return $ret['nb'];
    }

    protected function _addPid($pid)
    {
        $request = "
            UPDATE processes_pool
            SET
                nb_launched = (nb_launched + 1),
                pid_list = CONCAT_WS(',', (SELECT IF(LENGTH(pid_list) = 0, NULL, pid_list )), ?),
                updated = CURRENT_TIMESTAMP
            WHERE label = ?
        ";
        $this->_query($request, $pid, $this->_label);
        return true;
    }

    protected function _removePid($pid)
    {
        $req = "
            UPDATE processes_pool
            SET
                nb_launched = (nb_launched - 1),
                pid_list =
                    CONCAT_WS(',', (SELECT IF (LENGTH(
                        SUBSTRING_INDEX(pid_list, ',', (FIND_IN_SET(?, pid_list) - 1))) = 0, null,
                            SUBSTRING_INDEX(pid_list, ',', (FIND_IN_SET(?, pid_list) - 1)))), (SELECT IF (LENGTH(
                                SUBSTRING_INDEX(pid_list, ',', (-1 * ((LENGTH(pid_list) - LENGTH(REPLACE(pid_list, ',', ''))) + 1 - FIND_IN_SET(?, pid_list))))) = 0, null,
                                    SUBSTRING_INDEX(pid_list, ',', (-1 * ((LENGTH(pid_list) - LENGTH(REPLACE(pid_list, ',', ''))) + 1 - FIND_IN_SET(?, pid_list))
                                )
                            )
                        )
                    )
                 ),
                updated = CURRENT_TIMESTAMP
            WHERE label = ?";
        $this->_query($req, $pid, $pid, $pid, $pid, $this->_label);
        return true;
    }

    protected function _getPidList()
    {
        $req = "
            SELECT pid_list
            FROM processes_pool
            WHERE label = ?
        ";
        $ret = $this->_query($req, $this->_label);
        if ($ret === null)
        {
            return false;
        }
        if ($ret['pid_list'] == null)
        {
            return array();
        }
        $pid_list = explode(',', $ret['pid_list']);
        return $pid_list;
    }

    protected function _query($request)
    {
        $return = null;

        $stmt = $this->_sql->prepare($request);
        if ($stmt === false)
        {
            return $return;
        }

        $params = func_get_args();
        array_shift($params);

        if ($stmt->execute($params) === false)
        {
            return $return;
        }

        if (strncasecmp(trim($request), 'SELECT', 6) === 0)
        {
            $return = $stmt->fetch(PDO::FETCH_ASSOC);
        }

        return $return;
    }

}

PHP demo,进程启动器:

<?php

// pool_mysql_launcher.php

require_once("AbstractProcessesPool.php");
require_once("ProcessesPoolMySQL.php");

$dbh = new PDO("mysql:host=127.0.0.1;dbname=fuz", 'root', 'root');
$dbh->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);

$multi = new ProcessesPoolMySQL($label = 'test', $dbh);

if ($multi->create($max = '10') == false)
{
    echo "Pool creation failed ...\n";
    exit();
}

$count = 20;

for ($i = 0; ($i < $count); $i++)
{
    $ret = $multi->waitForResource($timeout = 10, $interval = 500000, 'test_waitForResource');
    if ($ret)
    {
        echo "Execute new process: $i\n";
        exec("/usr/bin/php ./pool_mysql_calc.php $i > /dev/null &");
    }
    else
    {
        echo "WaitForResources Timeout! Killing zombies...\n";
        $multi->killAllResources();
        break;
    }
}

$ret = $multi->waitForTheEnd($timeout = 10, $interval = 500000, 'test_waitForTheEnd');
if ($ret == false)
{
    echo "WaitForTheEnd Timeout! Killing zombies...\n";
    $multi->killAllResources();
}

$multi->destroy();
echo "Finish.\n";

function test_waitForResource($multi)
{
    echo "Waiting for available resource ( {$multi->getLabel()} )...\n";
}

function test_waitForTheEnd($multi)
{
    echo "Waiting for all resources to finish ( {$multi->getLabel()} )...\n";
}

PHP demo,子进程:

<?php

// pool_mysql_calc.php

require_once("AbstractProcessesPool.php");
require_once("ProcessesPoolMySQL.php");

$dbh = new PDO("mysql:host=127.0.0.1;dbname=fuz", 'root', 'root');
$dbh->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);

$multi = new ProcessesPoolMySQL($label = 'test', $dbh);

$multi->start();

// here I simulate job's execution
sleep(rand() % 7 + 1);

$multi->finish();

上面代码的输出是什么?

Demo output幸运的是,这些演示给出了大约相同的输出。如果没有达到超时(梦想的情况),输出是:

KolyMac:TaskManager ninsuo$ php pool_files_launcher.php 
Waiting for available resource ( test )...
Execute new process: 0
Waiting for available resource ( test )...
Execute new process: 1
Waiting for available resource ( test )...
Execute new process: 2
Waiting for available resource ( test )...
Execute new process: 3
Waiting for available resource ( test )...
Execute new process: 4
Waiting for available resource ( test )...
Execute new process: 5
Waiting for available resource ( test )...
Execute new process: 6
Waiting for available resource ( test )...
Execute new process: 7
Waiting for available resource ( test )...
Execute new process: 8
Waiting for available resource ( test )...
Execute new process: 9
Waiting for available resource ( test )...
Waiting for available resource ( test )...
Execute new process: 10
Waiting for available resource ( test )...
Waiting for available resource ( test )...
Waiting for available resource ( test )...
Execute new process: 11
Waiting for available resource ( test )...
Execute new process: 12
Waiting for available resource ( test )...
Execute new process: 13
Waiting for available resource ( test )...
Waiting for available resource ( test )...
Waiting for available resource ( test )...
Waiting for available resource ( test )...
Waiting for available resource ( test )...
Execute new process: 14
Waiting for available resource ( test )...
Waiting for available resource ( test )...
Execute new process: 15
Waiting for available resource ( test )...
Execute new process: 16
Waiting for available resource ( test )...
Execute new process: 17
Waiting for available resource ( test )...
Execute new process: 18
Waiting for available resource ( test )...
Execute new process: 19
Waiting for all resources to finish ( test )...
Waiting for all resources to finish ( test )...
Waiting for all resources to finish ( test )...
Waiting for all resources to finish ( test )...
Waiting for all resources to finish ( test )...
Waiting for all resources to finish ( test )...
Waiting for all resources to finish ( test )...
Waiting for all resources to finish ( test )...
Waiting for all resources to finish ( test )...
Waiting for all resources to finish ( test )...
Waiting for all resources to finish ( test )...
Waiting for all resources to finish ( test )...
Waiting for all resources to finish ( test )...
Waiting for all resources to finish ( test )...
Finish.

Demo output在更糟糕的情况下(我更改sleep(rand() % 7 + 1);sleep(rand() % 7 + 100);,这给出:

KolyMac:TaskManager ninsuo$ php pool_files_launcher.php 
Waiting for available resource ( test )...
Execute new process: 0
Waiting for available resource ( test )...
Execute new process: 1
Waiting for available resource ( test )...
Execute new process: 2
Waiting for available resource ( test )...
Execute new process: 3
Waiting for available resource ( test )...
Execute new process: 4
Waiting for available resource ( test )...
Execute new process: 5
Waiting for available resource ( test )...
Execute new process: 6
Waiting for available resource ( test )...
Execute new process: 7
Waiting for available resource ( test )...
Execute new process: 8
Waiting for available resource ( test )...
Execute new process: 9
Waiting for available resource ( test )...
Waiting for available resource ( test )...
Waiting for available resource ( test )...
Waiting for available resource ( test )...
Waiting for available resource ( test )...
Waiting for available resource ( test )...
(...)
Waiting for available resource ( test )...
Waiting for available resource ( test )...
Waiting for available resource ( test )...
WaitForResources Timeout! Killing zombies...
Waiting for all resources to finish ( test )...
Finish.

转到第 2 页继续阅读此答案。

于 2013-05-15T19:27:11.947 回答
5

第 2 页:SO 答案的正文限制为 30k 个字符,所以我需要创建一个新的。


控制结果

不允许出错

不允许出错!

是的 !您可以在不考虑资源的情况下启动大量进程。但是如果一个子进程失败了怎么办?将有一项未完成或未完成的工作!...

事实上,这比控制流程执行更简单(简单得多)。我们有一个使用池执行的作业队列,我们​​只需要知道一个作业在执行后是失败还是成功。如果执行整个池时出现故障,则将失败的进程放在新的池中,并再次执行。

结果控制

如何在 PHP 中进行?

这个原理基于集群:队列包含多个作业,但只代表一个实体。集群的每个计算都应该成功完成该实体。

路线图:

1我们创建一个包含集群所有计算的待办事项列表(与队列不匹配,用于进程管理)。每个作业都有一个状态:waiting(未执行)、running(已执行但未完成)、success和error(根据它们的结果),当然在这一步,它们的状态是WAITING。

2我们使用进程管理器运行所有作业(以保持对资源的控制),每个作业都以告诉任务管理器它运行开始,并根据他自己的上下文,以指示他的状态(失败或成功)结束。

3当整个队列执行完毕后,任务管理器会创建一个包含失败作业的新队列,然后再次循环。

4当所有工作都成功时,你就完成了,你确定没有出错。您的集群已完成,您的实体可在上层使用。

概念证明

关于这个主题没有什么可说的了,所以让我们编写一些代码,继续前面的示例代码。

至于进程管理,你可以使用几种方式来同步你的父母和孩子,但是没有逻辑,所以不需要抽象。因此,我开发了一个 MySQL 示例(编写速度更快),您可以根据自己的要求和约束自由调整这个概念。

MySQL创建下表:

CREATE TABLE `tasks_manager` (
  `cluster_label` varchar(40),
  `calcul_label` varchar(40),
  `status` enum('waiting', 'running', 'failed', 'success') default 'waiting',
  `updated` timestamp NOT NULL default CURRENT_TIMESTAMP on update CURRENT_TIMESTAMP,
  PRIMARY KEY  (`cluster_label`, `calcul_label`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

PHP这是TaskManager.php文件:

<?php

class TasksManager
{

    protected $_cluster_label;
    protected $_calcul_label;
    protected $_sql;

    const WAITING = "waiting";
    const RUNNING = "running";
    const SUCCESS = "success";
    const FAILED = "failed";

    public function __construct($label, PDO $sql)
    {
        $this->_sql = $sql;
        $this->_cluster_label = substr($label, 0, 40);
    }

    public function getClusterLabel()
    {
        return $this->_cluster_label;
    }

    public function getCalculLabel()
    {
        return $this->_calcul_label;
    }

    public function destroy()
    {
        $request = "
            DELETE FROM tasks_manager
            WHERE cluster_label = ?
        ";
        $this->_query($request, $this->_cluster_label);
        return $this;
    }

    public function start($calcul_label)
    {
        $this->_calcul_label = $calcul_label;
        $this->add($calcul_label, TasksManager::RUNNING);
        return $this;
    }

    public function finish($status = TasksManager::SUCCESS)
    {
        if (!$this->_isStatus($status))
        {
            throw new Exception("{$status} is not a valid status.");
        }
        if (is_null($this->_cluster_label))
        {
            throw new Exception("finish() called, but task never started.");
        }
        $request = "
            UPDATE tasks_manager
            SET status = ?
            WHERE cluster_label = ?
            AND calcul_label = ?
         ";
        $this->_query($request, $status, $this->_cluster_label, substr($this->_calcul_label, 0, 40));
        return $this;
    }

    public function add($calcul_label, $status = TasksManager::WAITING)
    {
        if (!$this->_isStatus($status))
        {
            throw new Exception("{$status} is not a valid status.");
        }
        $request = "
            INSERT INTO tasks_manager (
                cluster_label, calcul_label, status
            ) VALUES (
                ?, ?, ?
            )
            ON DUPLICATE KEY UPDATE
                status = ?
        ";
        $calcul_label = substr($calcul_label, 0, 40);
        $this->_query($request, $this->_cluster_label, $calcul_label, $status, $status);
        return $this;
    }

    public function delete($calcul_label)
    {
        $request = "
            DELETE FROM tasks_manager
            WHERE cluster_label = ?
            AND calcul_label = ?
        ";
        $this->_query($request, $this->_cluster_label, substr($calcul_label, 0, 40));
        return $this;
    }

    public function countStatus($status = TasksManager::SUCCESS)
    {
        if (!$this->_isStatus($status))
        {
            throw new Exception("{$status} is not a valid status.");
        }
        $request = "
            SELECT COUNT(*) AS cnt
            FROM tasks_manager
            WHERE cluster_label = ?
            AND status = ?
        ";
        $ret = $this->_query($request, $this->_cluster_label, $status);
        return $ret[0]['cnt'];
    }

    public function count()
    {
        $request = "
            SELECT COUNT(id) AS cnt
            FROM tasks_manager
            WHERE cluster_label = ?
        ";
        $ret = $this->_query($request, $this->_cluster_label);
        return $ret[0]['cnt'];
    }

    public function getCalculsByStatus($status = TasksManager::SUCCESS)
    {
        if (!$this->_isStatus($status))
        {
            throw new Exception("{$status} is not a valid status.");
        }
        $request = "
            SELECT calcul_label
            FROM tasks_manager
            WHERE cluster_label = ?
            AND status = ?
        ";
        $ret = $this->_query($request, $this->_cluster_label, $status);
        $array = array();
        if (!is_null($ret))
        {
            $array = array_map(function($row) {
                return $row['calcul_label'];
            }, $ret);
        }
        return $array;
    }

    public function switchStatus($statusA = TasksManager::RUNNING, $statusB = null)
    {
        if (!$this->_isStatus($statusA))
        {
            throw new Exception("{$statusA} is not a valid status.");
        }
        if ((!is_null($statusB)) && (!$this->_isStatus($statusB)))
        {
            throw new Exception("{$statusB} is not a valid status.");
        }
        if ($statusB != null)
        {
            $request = "
                UPDATE tasks_manager
                SET status = ?
                WHERE cluster_label = ?
                AND status = ?
            ";
            $this->_query($request, $statusB, $this->_cluster_label, $statusA);
        }
        else
        {
            $request = "
                UPDATE tasks_manager
                SET status = ?
                WHERE cluster_label = ?
            ";
            $this->_query($request, $statusA, $this->_cluster_label);
        }
        return $this;
    }

    private function _isStatus($status)
    {
        if (!is_string($status))
        {
            return false;
        }
        return in_array($status, array(
                self::FAILED,
                self::RUNNING,
                self::SUCCESS,
                self::WAITING,
        ));
    }

    protected function _query($request)
    {
        $return = null;

        $stmt = $this->_sql->prepare($request);
        if ($stmt === false)
        {
            return $return;
        }

        $params = func_get_args();
        array_shift($params);

        if ($stmt->execute($params) === false)
        {
            return $return;
        }

        if (strncasecmp(trim($request), 'SELECT', 6) === 0)
        {
            $return = $stmt->fetchAll(PDO::FETCH_ASSOC);
        }

        return $return;
    }

}

PHPtask_launcher.php是使用示例

<?php

require_once("AbstractProcessesPool.php");
require_once("ProcessesPoolMySQL.php");
require_once("TasksManager.php");

// Initializing database connection
$dbh = new PDO("mysql:host=127.0.0.1;dbname=fuz", 'root', 'root');
$dbh->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);

// Initializing process pool
$pool = new ProcessesPoolMySQL($label = "pool test", $dbh);
$pool->create($max = "10");

// Initializing task manager
$multi = new TasksManager($label = "jobs test", $dbh);
$multi->destroy();

// Simulating jobs
$count = 20;
$todo_list = array ();
for ($i = 0; ($i < $count); $i++)
{
    $todo_list[$i] = "Job {$i}";
    $multi->add($todo_list[$i], TasksManager::WAITING);
}

// Infinite loop until all jobs are done
$continue = true;
while ($continue)
{
    $continue = false;

    echo "Starting to run jobs in queue ...\n";

    // put all failed jobs to WAITING status
    $multi->switchStatus(TasksManager::FAILED, TasksManager::WAITING);

    foreach ($todo_list as $job)
    {

        $ret = $pool->waitForResource($timeout = 10, $interval = 500000, "waitResource");

        if ($ret)
        {
            echo "Executing job: $job\n";
            exec(sprintf("/usr/bin/php ./tasks_program.php %s > /dev/null &", escapeshellarg($job)));
        }
        else
        {
            echo "waitForResource timeout!\n";
            $pool->killAllResources();

            // All jobs currently running are considered dead, so, failed
            $multi->switchStatus(TasksManager::RUNNING, TasksManager::FAILED);

            break;
        }
    }

    $ret = $pool->waitForTheEnd($timeout = 10, $interval = 500000, "waitEnd");
    if ($ret == false)
    {
        echo "waitForTheEnd timeout!\n";
        $pool->killAllResources();

        // All jobs currently running are considered dead, so, failed
        $multi->switchStatus(TasksManager::RUNNING, TasksManager::FAILED);
    }


    echo "All jobs in queue executed, looking for errors...\n";

    // Counts if there is failures
    $nb_failed = $multi->countStatus(TasksManager::FAILED);
    if ($nb_failed > 0)
    {
        $todo_list = $multi->getCalculsByStatus(TasksManager::FAILED);
        echo sprintf("%d jobs failed: %s\n", $nb_failed, implode(', ', $todo_list));
        $continue = true;
    }
}

function waitResource($multi)
{
    echo "Waiting for a resource ....\n";
}

function waitEnd($multi)
{
    echo "Waiting for the end .....\n";
}

// All jobs finished, destroying task manager
$multi->destroy();

// Destroying process pool
$pool->destroy();

echo "Finish.\n";

PHP这是子程序(计算)

<?php

if (!isset($argv[1]))
{
    die("This program must be called with an identifier (calcul_label)\n");
}
$calcul_label = $argv[1];

require_once("AbstractProcessesPool.php");
require_once("ProcessesPoolMySQL.php");
require_once("TasksManager.php");

// Initializing database connection
$dbh = new PDO("mysql:host=127.0.0.1;dbname=fuz", 'root', 'root');
$dbh->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);

// Initializing process pool (with same label as parent)
$pool = new ProcessesPoolMySQL($label = "pool test", $dbh);

// Takes one resource in pool
$pool->start();

// Initializing task manager (with same label as parent)
$multi = new TasksManager($label = "jobs test", $dbh);
$multi->start($calcul_label);

// Simulating execution time
$secs = (rand() % 2) + 3;
sleep($secs);

// Simulating job status
$status = rand() % 3 == 0 ? TasksManager::FAILED : TasksManager::SUCCESS;

// Job finishes indicating his status
$multi->finish($status);

// Releasing pool's resource
$pool->finish();

Demo output这个演示会给你这样的东西对于 SO 来说太大了)。


进程之间的同步和通信

通信错误示例

有解决方案可以轻松无错误地进行通信。

喝杯咖啡,我们接近尾声了!

我们现在能够启动大量进程,它们都给出了预期的结果,嗯,这还不错。但是现在,我们所有的进程都是独立执行的,它们实际上无法相互通信。那是您的核心问题,并且有很多解决方案。

很难确切地告诉你你需要什么样的沟通。你在谈论你尝试过的东西(IPC、使用文件的通信或自制协议),但不是在你的进程之间共享什么样的信息。无论如何,我邀请您考虑 OOP 解决方案。

PHP 很强大。

PHP 有神奇的方法

  • __get($property)让我们实现$property对对象的 a 访问
  • __set($property, $value)让我们$property在对象上实现 a 的分配

PHP 可以处理文件,具有并发访问管理

  • fopen($file, 'c+')打开一个启用了咨询锁定选项的文件(允许您使用flock
  • flock($descriptor, LOCK_SH)获取共享锁(用于读取)
  • flock($descriptor, LOCK_EX)获取排他锁(用于写入)

最后,PHP 具有:

  • json_encode($object)获取对象的 json 表示
  • json_decode($string)从 json 字符串中取回一个对象

你看到我要去哪里了吗?我们将创建一个Synchro类,它的工作方式与stdClass该类相同,但它始终会安全地同步到一个文件中。我们的进程将能够同时访问该对象的同一个实例

一些 Linux 系统技巧

当然,如果您有 150 个进程同时处理同一个文件,您的硬盘驱动器会减慢您的进程。为了解决这个问题,为什么不在 RAM 上创建一个文件系统分区呢?写入该文件将与写入内存一样快!

shell以 root 身份输入以下命令:

mkfs -q /dev/ram1 65536
mkdir -p /ram
mount /dev/ram1 /ram

一些注意事项:

  • 65536 以千字节为单位,这里你得到一个 64M 的分区。

  • 如果要在启动时挂载该分区,请创建一个 shell 脚本并在/etc/rc.local文件中调用它。

执行

PHP这是Synchro.php课程。

<?php

class Synchro
{

   private $_file;

   public function __construct($file)
   {
       $this->_file = $file;
   }

   public function __get($property)
   {
       // File does not exist
       if (!is_file($this->_file))
       {
           return null;
       }

       // Check if file is readable
       if ((is_file($this->_file)) && (!is_readable($this->_file)))
       {
           throw new Exception(sprintf("File '%s' is not readable.", $this->_file));
       }

       // Open file with advisory lock option enabled for reading and writting
       if (($fd = fopen($this->_file, 'c+')) === false)
       {
           throw new Exception(sprintf("Can't open '%s' file.", $this->_file));
       }

       // Request a lock for reading (hangs until lock is granted successfully)
       if (flock($fd, LOCK_SH) === false)
       {
           throw new Exception(sprintf("Can't lock '%s' file for reading.", $this->_file));
       }

       // A hand-made file_get_contents
       $contents = '';
       while (($read = fread($fd, 32 * 1024)) !== '')
       {
           $contents .= $read;
       }

       // Release shared lock and close file
       flock($fd, LOCK_UN);
       fclose($fd);

       // Restore shared data object and return requested property
       $object = json_decode($contents);
       if (property_exists($object, $property))
       {
           return $object->{$property};
       }

       return null;
   }

   public function __set($property, $value)
   {
       // Check if directory is writable if file does not exist
       if ((!is_file($this->_file)) && (!is_writable(dirname($this->_file))))
       {
           throw new Exception(sprintf("Directory '%s' does not exist or is not writable.", dirname($this->_file)));
       }

       // Check if file is writable if it exists
       if ((is_file($this->_file)) && (!is_writable($this->_file)))
       {
           throw new Exception(sprintf("File '%s' is not writable.", $this->_file));
       }

       // Open file with advisory lock option enabled for reading and writting
       if (($fd = fopen($this->_file, 'c+')) === false)
       {
           throw new Exception(sprintf("Can't open '%s' file.", $this->_file));
       }

       // Request a lock for writting (hangs until lock is granted successfully)
       if (flock($fd, LOCK_EX) === false)
       {
           throw new Exception(sprintf("Can't lock '%s' file for writing.", $this->_file));
       }

       // A hand-made file_get_contents
       $contents = '';
       while (($read = fread($fd, 32 * 1024)) !== '')
       {
           $contents .= $read;
       }

       // Restore shared data object and set value for desired property
       if (empty($contents))
       {
           $object = new stdClass();
       }
       else
       {
           $object = json_decode($contents);
       }
       $object->{$property} = $value;

       // Go back at the beginning of file
       rewind($fd);

       // Truncate file
       ftruncate($fd, strlen($contents));

       // Save shared data object to the file
       fwrite($fd, json_encode($object));

       // Release exclusive lock and close file
       flock($fd, LOCK_UN);
       fclose($fd);

       return $value;
   }

}

示范

我们将通过让我们的流程相互通信来继续(并完成)我们的流程/任务示例。

规则:

  • 我们的目标是得到 1 到 20 之间所有数字的总和。
  • 我们有 20 个进程,ID 从 1 到 20。
  • 这些进程随机排队等待执行。
  • 每个进程(除了进程 1)只能做一个计算:它的 id + 前一个进程的结果
  • 流程1直接把他的id
  • 如果每个进程可以进行计算,则每个进程都会成功(意味着,如果前一个进程的结果可用),否则它会失败(并且是新队列的候选者)
  • 池的超时时间在 10 秒后到期

好吧,它看起来很复杂,但实际上,它很好地代表了您在现实生活中会发现的情况。

PHP synchro_launcher.php文件。

<?php

require_once("AbstractProcessesPool.php");
require_once("ProcessesPoolMySQL.php");
require_once("TasksManager.php");
require_once("Synchro.php");

// Removing old synchroized object
if (is_file("/tmp/synchro.txt"))
{
    unlink("/tmp/synchro.txt");
}

// Initializing database connection
$dbh = new PDO("mysql:host=127.0.0.1;dbname=fuz", 'root', 'root');
$dbh->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);

// Initializing process pool
$pool = new ProcessesPoolMySQL($label = "synchro pool", $dbh);
$pool->create($max = "10");

// Initializing task manager
$multi = new TasksManager($label = "synchro tasks", $dbh);
$multi->destroy();

// Simulating jobs
$todo_list = array ();
for ($i = 1; ($i <= 20); $i++)
{
    $todo_list[$i] = $i;
    $multi->add($todo_list[$i], TasksManager::WAITING);
}

// Infinite loop until all jobs are done
$continue = true;
while ($continue)
{
    $continue = false;

    echo "Starting to run jobs in queue ...\n";

    // Shuffle all jobs (else this will be too easy :-))
    shuffle($todo_list);

    // put all failed jobs to WAITING status
    $multi->switchStatus(TasksManager::FAILED, TasksManager::WAITING);

    foreach ($todo_list as $job)
    {

        $ret = $pool->waitForResource($timeout = 10, $interval = 500000, "waitResource");

        if ($ret)
        {
            echo "Executing job: $job\n";
            exec(sprintf("/usr/bin/php ./synchro_program.php %s > /dev/null &", escapeshellarg($job)));
        }
        else
        {
            echo "waitForResource timeout!\n";
            $pool->killAllResources();

            // All jobs currently running are considered dead, so, failed
            $multi->switchStatus(TasksManager::RUNNING, TasksManager::FAILED);

            break;
        }
    }

    $ret = $pool->waitForTheEnd($timeout = 10, $interval = 500000, "waitEnd");
    if ($ret == false)
    {
        echo "waitForTheEnd timeout!\n";
        $pool->killAllResources();

        // All jobs currently running are considered dead, so, failed
        $multi->switchStatus(TasksManager::RUNNING, TasksManager::FAILED);
    }


    echo "All jobs in queue executed, looking for errors...\n";

    // Counts if there is failures
    $multi->switchStatus(TasksManager::WAITING, TasksManager::FAILED);
    $nb_failed = $multi->countStatus(TasksManager::FAILED);
    if ($nb_failed > 0)
    {
        $todo_list = $multi->getCalculsByStatus(TasksManager::FAILED);
        echo sprintf("%d jobs failed: %s\n", $nb_failed, implode(', ', $todo_list));
        $continue = true;
    }
}

function waitResource($multi)
{
    echo "Waiting for a resource ....\n";
}

function waitEnd($multi)
{
    echo "Waiting for the end .....\n";
}

// All jobs finished, destroying task manager
$multi->destroy();

// Destroying process pool
$pool->destroy();

// Recovering final result
$synchro = new Synchro("/tmp/synchro.txt");
echo sprintf("Result of the sum of all numbers between 1 and 20 included is: %d\n", $synchro->result20);

echo "Finish.\n";

PHP及其相关synchro_calcul.php文件。

<?php

if (!isset($argv[1]))
{
   die("This program must be called with an identifier (calcul_label)\n");
}
$current_id = $argv[1];

require_once("AbstractProcessesPool.php");
require_once("ProcessesPoolMySQL.php");
require_once("TasksManager.php");
require_once("Synchro.php");

// Initializing database connection
$dbh = new PDO("mysql:host=127.0.0.1;dbname=fuz", 'root', 'root');
$dbh->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);

// Initializing process pool (with same label as parent)
$pool = new ProcessesPoolMySQL($label = "synchro pool", $dbh);

// Takes one resource in pool
$pool->start();

// Initializing task manager (with same label as parent)
$multi = new TasksManager($label = "synchro tasks", $dbh);
$multi->start($current_id);

// ------------------------------------------------------
// Job begins here

$synchro = new Synchro("/tmp/synchro.txt");

if ($current_id == 1)
{
   $synchro->result1 = 1;
   $status = TasksManager::SUCCESS;
}
else
{
   $previous_id = $current_id - 1;
   if (is_null($synchro->{"result{$previous_id}"}))
   {
       $status = TasksManager::FAILED;
   }
   else
   {
       $synchro->{"result{$current_id}"} = $synchro->{"result{$previous_id}"} + $current_id;
       $status = TasksManager::SUCCESS;
   }
}

// ------------------------------------------------------

// Job finishes indicating his status
$multi->finish($status);

// Releasing pool's resource
$pool->finish();

输出

以下演示将为您提供类似这样的输出(对于 SO 来说太大了)


结论

由于缺少线程,PHP 中的任务管理并不容易。与许多开发人员一样,我希望有一天会内置此功能。无论如何,这可以控制资源和结果,并在进程之间共享数据,因此我们可以通过一些我假设的工作来有效地进行任务管理。

同步和通信可以通过多种方式完成,但您需要根据您的限制和要求检查每种方式的优缺点。例如:

  • 如果您需要一次启动 500 个任务并想使用 MySQL 同步方法,则需要 1+500 个同时连接到数据库(可能不会很感激)。

  • 如果您需要共享大量数据,仅使用一个文件可能效率低下。

  • 如果您使用文件进行同步,请不要忘记查看系统内置工具,例如/dev/sdram.

  • 尝试尽可能多地停留在面向对象的编程中以处理您的麻烦。自制协议等会使您的应用程序更难维护。

关于这个有趣的主题,我给了你 2 美分,我希望它能给你一些想法来解决你的问题。

于 2013-05-16T19:21:22.283 回答
2

我建议你看看这个名为PHP-Queue的库:https ://github.com/CoderKungfu/php-queue

来自其 github 页面的简短描述:

用于不同排队后端的统一前端。包括 REST 服务器、CLI 界面和守护程序运行程序。

查看其 github 页面了解更多详情。

稍加修改,我认为这个库将帮助您解决问题。

希望这可以帮助。

于 2013-05-17T07:27:57.990 回答