第 2 页:SO 答案的正文限制为 30k 个字符,所以我需要创建一个新的。
控制结果
不允许出错!
是的 !您可以在不考虑资源的情况下启动大量进程。但是如果一个子进程失败了怎么办?将有一项未完成或未完成的工作!...
事实上,这比控制流程执行更简单(简单得多)。我们有一个使用池执行的作业队列,我们只需要知道一个作业在执行后是失败还是成功。如果执行整个池时出现故障,则将失败的进程放在新的池中,并再次执行。
如何在 PHP 中进行?
这个原理基于集群:队列包含多个作业,但只代表一个实体。集群的每个计算都应该成功完成该实体。
路线图:
1我们创建一个包含集群所有计算的待办事项列表(与队列不匹配,用于进程管理)。每个作业都有一个状态:waiting(未执行)、running(已执行但未完成)、success和error(根据它们的结果),当然在这一步,它们的状态是WAITING。
2我们使用进程管理器运行所有作业(以保持对资源的控制),每个作业都以告诉任务管理器它运行开始,并根据他自己的上下文,以指示他的状态(失败或成功)结束。
3当整个队列执行完毕后,任务管理器会创建一个包含失败作业的新队列,然后再次循环。
4当所有工作都成功时,你就完成了,你确定没有出错。您的集群已完成,您的实体可在上层使用。
概念证明
关于这个主题没有什么可说的了,所以让我们编写一些代码,继续前面的示例代码。
至于进程管理,你可以使用几种方式来同步你的父母和孩子,但是没有硬逻辑,所以不需要抽象。因此,我开发了一个 MySQL 示例(编写速度更快),您可以根据自己的要求和约束自由调整这个概念。
MySQL创建下表:
CREATE TABLE `tasks_manager` (
`cluster_label` varchar(40),
`calcul_label` varchar(40),
`status` enum('waiting', 'running', 'failed', 'success') default 'waiting',
`updated` timestamp NOT NULL default CURRENT_TIMESTAMP on update CURRENT_TIMESTAMP,
PRIMARY KEY (`cluster_label`, `calcul_label`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
PHP这是TaskManager.php
文件:
<?php
class TasksManager
{
protected $_cluster_label;
protected $_calcul_label;
protected $_sql;
const WAITING = "waiting";
const RUNNING = "running";
const SUCCESS = "success";
const FAILED = "failed";
public function __construct($label, PDO $sql)
{
$this->_sql = $sql;
$this->_cluster_label = substr($label, 0, 40);
}
public function getClusterLabel()
{
return $this->_cluster_label;
}
public function getCalculLabel()
{
return $this->_calcul_label;
}
public function destroy()
{
$request = "
DELETE FROM tasks_manager
WHERE cluster_label = ?
";
$this->_query($request, $this->_cluster_label);
return $this;
}
public function start($calcul_label)
{
$this->_calcul_label = $calcul_label;
$this->add($calcul_label, TasksManager::RUNNING);
return $this;
}
public function finish($status = TasksManager::SUCCESS)
{
if (!$this->_isStatus($status))
{
throw new Exception("{$status} is not a valid status.");
}
if (is_null($this->_cluster_label))
{
throw new Exception("finish() called, but task never started.");
}
$request = "
UPDATE tasks_manager
SET status = ?
WHERE cluster_label = ?
AND calcul_label = ?
";
$this->_query($request, $status, $this->_cluster_label, substr($this->_calcul_label, 0, 40));
return $this;
}
public function add($calcul_label, $status = TasksManager::WAITING)
{
if (!$this->_isStatus($status))
{
throw new Exception("{$status} is not a valid status.");
}
$request = "
INSERT INTO tasks_manager (
cluster_label, calcul_label, status
) VALUES (
?, ?, ?
)
ON DUPLICATE KEY UPDATE
status = ?
";
$calcul_label = substr($calcul_label, 0, 40);
$this->_query($request, $this->_cluster_label, $calcul_label, $status, $status);
return $this;
}
public function delete($calcul_label)
{
$request = "
DELETE FROM tasks_manager
WHERE cluster_label = ?
AND calcul_label = ?
";
$this->_query($request, $this->_cluster_label, substr($calcul_label, 0, 40));
return $this;
}
public function countStatus($status = TasksManager::SUCCESS)
{
if (!$this->_isStatus($status))
{
throw new Exception("{$status} is not a valid status.");
}
$request = "
SELECT COUNT(*) AS cnt
FROM tasks_manager
WHERE cluster_label = ?
AND status = ?
";
$ret = $this->_query($request, $this->_cluster_label, $status);
return $ret[0]['cnt'];
}
public function count()
{
$request = "
SELECT COUNT(id) AS cnt
FROM tasks_manager
WHERE cluster_label = ?
";
$ret = $this->_query($request, $this->_cluster_label);
return $ret[0]['cnt'];
}
public function getCalculsByStatus($status = TasksManager::SUCCESS)
{
if (!$this->_isStatus($status))
{
throw new Exception("{$status} is not a valid status.");
}
$request = "
SELECT calcul_label
FROM tasks_manager
WHERE cluster_label = ?
AND status = ?
";
$ret = $this->_query($request, $this->_cluster_label, $status);
$array = array();
if (!is_null($ret))
{
$array = array_map(function($row) {
return $row['calcul_label'];
}, $ret);
}
return $array;
}
public function switchStatus($statusA = TasksManager::RUNNING, $statusB = null)
{
if (!$this->_isStatus($statusA))
{
throw new Exception("{$statusA} is not a valid status.");
}
if ((!is_null($statusB)) && (!$this->_isStatus($statusB)))
{
throw new Exception("{$statusB} is not a valid status.");
}
if ($statusB != null)
{
$request = "
UPDATE tasks_manager
SET status = ?
WHERE cluster_label = ?
AND status = ?
";
$this->_query($request, $statusB, $this->_cluster_label, $statusA);
}
else
{
$request = "
UPDATE tasks_manager
SET status = ?
WHERE cluster_label = ?
";
$this->_query($request, $statusA, $this->_cluster_label);
}
return $this;
}
private function _isStatus($status)
{
if (!is_string($status))
{
return false;
}
return in_array($status, array(
self::FAILED,
self::RUNNING,
self::SUCCESS,
self::WAITING,
));
}
protected function _query($request)
{
$return = null;
$stmt = $this->_sql->prepare($request);
if ($stmt === false)
{
return $return;
}
$params = func_get_args();
array_shift($params);
if ($stmt->execute($params) === false)
{
return $return;
}
if (strncasecmp(trim($request), 'SELECT', 6) === 0)
{
$return = $stmt->fetchAll(PDO::FETCH_ASSOC);
}
return $return;
}
}
PHP这task_launcher.php
是使用示例
<?php
require_once("AbstractProcessesPool.php");
require_once("ProcessesPoolMySQL.php");
require_once("TasksManager.php");
// Initializing database connection
$dbh = new PDO("mysql:host=127.0.0.1;dbname=fuz", 'root', 'root');
$dbh->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
// Initializing process pool
$pool = new ProcessesPoolMySQL($label = "pool test", $dbh);
$pool->create($max = "10");
// Initializing task manager
$multi = new TasksManager($label = "jobs test", $dbh);
$multi->destroy();
// Simulating jobs
$count = 20;
$todo_list = array ();
for ($i = 0; ($i < $count); $i++)
{
$todo_list[$i] = "Job {$i}";
$multi->add($todo_list[$i], TasksManager::WAITING);
}
// Infinite loop until all jobs are done
$continue = true;
while ($continue)
{
$continue = false;
echo "Starting to run jobs in queue ...\n";
// put all failed jobs to WAITING status
$multi->switchStatus(TasksManager::FAILED, TasksManager::WAITING);
foreach ($todo_list as $job)
{
$ret = $pool->waitForResource($timeout = 10, $interval = 500000, "waitResource");
if ($ret)
{
echo "Executing job: $job\n";
exec(sprintf("/usr/bin/php ./tasks_program.php %s > /dev/null &", escapeshellarg($job)));
}
else
{
echo "waitForResource timeout!\n";
$pool->killAllResources();
// All jobs currently running are considered dead, so, failed
$multi->switchStatus(TasksManager::RUNNING, TasksManager::FAILED);
break;
}
}
$ret = $pool->waitForTheEnd($timeout = 10, $interval = 500000, "waitEnd");
if ($ret == false)
{
echo "waitForTheEnd timeout!\n";
$pool->killAllResources();
// All jobs currently running are considered dead, so, failed
$multi->switchStatus(TasksManager::RUNNING, TasksManager::FAILED);
}
echo "All jobs in queue executed, looking for errors...\n";
// Counts if there is failures
$nb_failed = $multi->countStatus(TasksManager::FAILED);
if ($nb_failed > 0)
{
$todo_list = $multi->getCalculsByStatus(TasksManager::FAILED);
echo sprintf("%d jobs failed: %s\n", $nb_failed, implode(', ', $todo_list));
$continue = true;
}
}
function waitResource($multi)
{
echo "Waiting for a resource ....\n";
}
function waitEnd($multi)
{
echo "Waiting for the end .....\n";
}
// All jobs finished, destroying task manager
$multi->destroy();
// Destroying process pool
$pool->destroy();
echo "Finish.\n";
PHP这是子程序(计算)
<?php
if (!isset($argv[1]))
{
die("This program must be called with an identifier (calcul_label)\n");
}
$calcul_label = $argv[1];
require_once("AbstractProcessesPool.php");
require_once("ProcessesPoolMySQL.php");
require_once("TasksManager.php");
// Initializing database connection
$dbh = new PDO("mysql:host=127.0.0.1;dbname=fuz", 'root', 'root');
$dbh->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
// Initializing process pool (with same label as parent)
$pool = new ProcessesPoolMySQL($label = "pool test", $dbh);
// Takes one resource in pool
$pool->start();
// Initializing task manager (with same label as parent)
$multi = new TasksManager($label = "jobs test", $dbh);
$multi->start($calcul_label);
// Simulating execution time
$secs = (rand() % 2) + 3;
sleep($secs);
// Simulating job status
$status = rand() % 3 == 0 ? TasksManager::FAILED : TasksManager::SUCCESS;
// Job finishes indicating his status
$multi->finish($status);
// Releasing pool's resource
$pool->finish();
Demo output这个演示会给你这样的东西(对于 SO 来说太大了)。
进程之间的同步和通信
有解决方案可以轻松无错误地进行通信。
喝杯咖啡,我们接近尾声了!
我们现在能够启动大量进程,它们都给出了预期的结果,嗯,这还不错。但是现在,我们所有的进程都是独立执行的,它们实际上无法相互通信。那是您的核心问题,并且有很多解决方案。
很难确切地告诉你你需要什么样的沟通。你在谈论你尝试过的东西(IPC、使用文件的通信或自制协议),但不是在你的进程之间共享什么样的信息。无论如何,我邀请您考虑 OOP 解决方案。
PHP 很强大。
PHP 有神奇的方法:
__get($property)
让我们实现$property
对对象的 a 访问
__set($property, $value)
让我们$property
在对象上实现 a 的分配
PHP 可以处理文件,具有并发访问管理
fopen($file, 'c+')
打开一个启用了咨询锁定选项的文件(允许您使用flock)
flock($descriptor, LOCK_SH)
获取共享锁(用于读取)
flock($descriptor, LOCK_EX)
获取排他锁(用于写入)
最后,PHP 具有:
json_encode($object)
获取对象的 json 表示
json_decode($string)
从 json 字符串中取回一个对象
你看到我要去哪里了吗?我们将创建一个Synchro
类,它的工作方式与stdClass
该类相同,但它始终会安全地同步到一个文件中。我们的进程将能够同时访问该对象的同一个实例。
一些 Linux 系统技巧
当然,如果您有 150 个进程同时处理同一个文件,您的硬盘驱动器会减慢您的进程。为了解决这个问题,为什么不在 RAM 上创建一个文件系统分区呢?写入该文件将与写入内存一样快!
shell以 root 身份输入以下命令:
mkfs -q /dev/ram1 65536
mkdir -p /ram
mount /dev/ram1 /ram
一些注意事项:
执行
PHP这是Synchro.php
课程。
<?php
class Synchro
{
private $_file;
public function __construct($file)
{
$this->_file = $file;
}
public function __get($property)
{
// File does not exist
if (!is_file($this->_file))
{
return null;
}
// Check if file is readable
if ((is_file($this->_file)) && (!is_readable($this->_file)))
{
throw new Exception(sprintf("File '%s' is not readable.", $this->_file));
}
// Open file with advisory lock option enabled for reading and writting
if (($fd = fopen($this->_file, 'c+')) === false)
{
throw new Exception(sprintf("Can't open '%s' file.", $this->_file));
}
// Request a lock for reading (hangs until lock is granted successfully)
if (flock($fd, LOCK_SH) === false)
{
throw new Exception(sprintf("Can't lock '%s' file for reading.", $this->_file));
}
// A hand-made file_get_contents
$contents = '';
while (($read = fread($fd, 32 * 1024)) !== '')
{
$contents .= $read;
}
// Release shared lock and close file
flock($fd, LOCK_UN);
fclose($fd);
// Restore shared data object and return requested property
$object = json_decode($contents);
if (property_exists($object, $property))
{
return $object->{$property};
}
return null;
}
public function __set($property, $value)
{
// Check if directory is writable if file does not exist
if ((!is_file($this->_file)) && (!is_writable(dirname($this->_file))))
{
throw new Exception(sprintf("Directory '%s' does not exist or is not writable.", dirname($this->_file)));
}
// Check if file is writable if it exists
if ((is_file($this->_file)) && (!is_writable($this->_file)))
{
throw new Exception(sprintf("File '%s' is not writable.", $this->_file));
}
// Open file with advisory lock option enabled for reading and writting
if (($fd = fopen($this->_file, 'c+')) === false)
{
throw new Exception(sprintf("Can't open '%s' file.", $this->_file));
}
// Request a lock for writting (hangs until lock is granted successfully)
if (flock($fd, LOCK_EX) === false)
{
throw new Exception(sprintf("Can't lock '%s' file for writing.", $this->_file));
}
// A hand-made file_get_contents
$contents = '';
while (($read = fread($fd, 32 * 1024)) !== '')
{
$contents .= $read;
}
// Restore shared data object and set value for desired property
if (empty($contents))
{
$object = new stdClass();
}
else
{
$object = json_decode($contents);
}
$object->{$property} = $value;
// Go back at the beginning of file
rewind($fd);
// Truncate file
ftruncate($fd, strlen($contents));
// Save shared data object to the file
fwrite($fd, json_encode($object));
// Release exclusive lock and close file
flock($fd, LOCK_UN);
fclose($fd);
return $value;
}
}
示范
我们将通过让我们的流程相互通信来继续(并完成)我们的流程/任务示例。
规则:
- 我们的目标是得到 1 到 20 之间所有数字的总和。
- 我们有 20 个进程,ID 从 1 到 20。
- 这些进程随机排队等待执行。
- 每个进程(除了进程 1)只能做一个计算:它的 id + 前一个进程的结果
- 流程1直接把他的id
- 如果每个进程可以进行计算,则每个进程都会成功(意味着,如果前一个进程的结果可用),否则它会失败(并且是新队列的候选者)
- 池的超时时间在 10 秒后到期
好吧,它看起来很复杂,但实际上,它很好地代表了您在现实生活中会发现的情况。
PHP synchro_launcher.php
文件。
<?php
require_once("AbstractProcessesPool.php");
require_once("ProcessesPoolMySQL.php");
require_once("TasksManager.php");
require_once("Synchro.php");
// Removing old synchroized object
if (is_file("/tmp/synchro.txt"))
{
unlink("/tmp/synchro.txt");
}
// Initializing database connection
$dbh = new PDO("mysql:host=127.0.0.1;dbname=fuz", 'root', 'root');
$dbh->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
// Initializing process pool
$pool = new ProcessesPoolMySQL($label = "synchro pool", $dbh);
$pool->create($max = "10");
// Initializing task manager
$multi = new TasksManager($label = "synchro tasks", $dbh);
$multi->destroy();
// Simulating jobs
$todo_list = array ();
for ($i = 1; ($i <= 20); $i++)
{
$todo_list[$i] = $i;
$multi->add($todo_list[$i], TasksManager::WAITING);
}
// Infinite loop until all jobs are done
$continue = true;
while ($continue)
{
$continue = false;
echo "Starting to run jobs in queue ...\n";
// Shuffle all jobs (else this will be too easy :-))
shuffle($todo_list);
// put all failed jobs to WAITING status
$multi->switchStatus(TasksManager::FAILED, TasksManager::WAITING);
foreach ($todo_list as $job)
{
$ret = $pool->waitForResource($timeout = 10, $interval = 500000, "waitResource");
if ($ret)
{
echo "Executing job: $job\n";
exec(sprintf("/usr/bin/php ./synchro_program.php %s > /dev/null &", escapeshellarg($job)));
}
else
{
echo "waitForResource timeout!\n";
$pool->killAllResources();
// All jobs currently running are considered dead, so, failed
$multi->switchStatus(TasksManager::RUNNING, TasksManager::FAILED);
break;
}
}
$ret = $pool->waitForTheEnd($timeout = 10, $interval = 500000, "waitEnd");
if ($ret == false)
{
echo "waitForTheEnd timeout!\n";
$pool->killAllResources();
// All jobs currently running are considered dead, so, failed
$multi->switchStatus(TasksManager::RUNNING, TasksManager::FAILED);
}
echo "All jobs in queue executed, looking for errors...\n";
// Counts if there is failures
$multi->switchStatus(TasksManager::WAITING, TasksManager::FAILED);
$nb_failed = $multi->countStatus(TasksManager::FAILED);
if ($nb_failed > 0)
{
$todo_list = $multi->getCalculsByStatus(TasksManager::FAILED);
echo sprintf("%d jobs failed: %s\n", $nb_failed, implode(', ', $todo_list));
$continue = true;
}
}
function waitResource($multi)
{
echo "Waiting for a resource ....\n";
}
function waitEnd($multi)
{
echo "Waiting for the end .....\n";
}
// All jobs finished, destroying task manager
$multi->destroy();
// Destroying process pool
$pool->destroy();
// Recovering final result
$synchro = new Synchro("/tmp/synchro.txt");
echo sprintf("Result of the sum of all numbers between 1 and 20 included is: %d\n", $synchro->result20);
echo "Finish.\n";
PHP及其相关synchro_calcul.php
文件。
<?php
if (!isset($argv[1]))
{
die("This program must be called with an identifier (calcul_label)\n");
}
$current_id = $argv[1];
require_once("AbstractProcessesPool.php");
require_once("ProcessesPoolMySQL.php");
require_once("TasksManager.php");
require_once("Synchro.php");
// Initializing database connection
$dbh = new PDO("mysql:host=127.0.0.1;dbname=fuz", 'root', 'root');
$dbh->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
// Initializing process pool (with same label as parent)
$pool = new ProcessesPoolMySQL($label = "synchro pool", $dbh);
// Takes one resource in pool
$pool->start();
// Initializing task manager (with same label as parent)
$multi = new TasksManager($label = "synchro tasks", $dbh);
$multi->start($current_id);
// ------------------------------------------------------
// Job begins here
$synchro = new Synchro("/tmp/synchro.txt");
if ($current_id == 1)
{
$synchro->result1 = 1;
$status = TasksManager::SUCCESS;
}
else
{
$previous_id = $current_id - 1;
if (is_null($synchro->{"result{$previous_id}"}))
{
$status = TasksManager::FAILED;
}
else
{
$synchro->{"result{$current_id}"} = $synchro->{"result{$previous_id}"} + $current_id;
$status = TasksManager::SUCCESS;
}
}
// ------------------------------------------------------
// Job finishes indicating his status
$multi->finish($status);
// Releasing pool's resource
$pool->finish();
输出
以下演示将为您提供类似这样的输出(对于 SO 来说太大了)
结论
由于缺少线程,PHP 中的任务管理并不容易。与许多开发人员一样,我希望有一天会内置此功能。无论如何,这可以控制资源和结果,并在进程之间共享数据,因此我们可以通过一些我假设的工作来有效地进行任务管理。
同步和通信可以通过多种方式完成,但您需要根据您的限制和要求检查每种方式的优缺点。例如:
如果您需要一次启动 500 个任务并想使用 MySQL 同步方法,则需要 1+500 个同时连接到数据库(可能不会很感激)。
如果您需要共享大量数据,仅使用一个文件可能效率低下。
如果您使用文件进行同步,请不要忘记查看系统内置工具,例如/dev/sdram
.
尝试尽可能多地停留在面向对象的编程中以处理您的麻烦。自制协议等会使您的应用程序更难维护。
关于这个有趣的主题,我给了你 2 美分,我希望它能给你一些想法来解决你的问题。