问题:我想实现几个 php-worker 进程,它们在 MQ 服务器队列上侦听异步作业。现在的问题是,简单地将这些进程作为服务器上的守护进程运行并不能真正让我对实例进行任何级别的控制(负载、状态、锁定)......除了转储 ps -aux 之外。因此,我正在寻找某种运行时环境,可以让我在系统(进程)级别或更高层(某种 Java 风格的应用服务器)上监视和控制实例
任何指针?
问题:我想实现几个 php-worker 进程,它们在 MQ 服务器队列上侦听异步作业。现在的问题是,简单地将这些进程作为服务器上的守护进程运行并不能真正让我对实例进行任何级别的控制(负载、状态、锁定)......除了转储 ps -aux 之外。因此,我正在寻找某种运行时环境,可以让我在系统(进程)级别或更高层(某种 Java 风格的应用服务器)上监视和控制实例
任何指针?
这是一些可能有用的代码。
<?
define('WANT_PROCESSORS', 5);
define('PROCESSOR_EXECUTABLE', '/path/to/your/processor');
set_time_limit(0);
$cycles = 0;
$run = true;
$reload = false;
declare(ticks = 30);
function signal_handler($signal) {
switch($signal) {
case SIGTERM :
global $run;
$run = false;
break;
case SIGHUP :
global $reload;
$reload = true;
break;
}
}
pcntl_signal(SIGTERM, 'signal_handler');
pcntl_signal(SIGHUP, 'signal_handler');
function spawn_processor() {
$pid = pcntl_fork();
if($pid) {
global $processors;
$processors[] = $pid;
} else {
if(posix_setsid() == -1)
die("Forked process could not detach from terminal\n");
fclose(stdin);
fclose(stdout);
fclose(stderr);
pcntl_exec(PROCESSOR_EXECUTABLE);
die('Failed to fork ' . PROCESSOR_EXECUTABLE . "\n");
}
}
function spawn_processors() {
global $processors;
if($processors)
kill_processors();
$processors = array();
for($ix = 0; $ix < WANT_PROCESSORS; $ix++)
spawn_processor();
}
function kill_processors() {
global $processors;
foreach($processors as $processor)
posix_kill($processor, SIGTERM);
foreach($processors as $processor)
pcntl_waitpid($processor);
unset($processors);
}
function check_processors() {
global $processors;
$valid = array();
foreach($processors as $processor) {
pcntl_waitpid($processor, $status, WNOHANG);
if(posix_getsid($processor))
$valid[] = $processor;
}
$processors = $valid;
if(count($processors) > WANT_PROCESSORS) {
for($ix = count($processors) - 1; $ix >= WANT_PROCESSORS; $ix--)
posix_kill($processors[$ix], SIGTERM);
for($ix = count($processors) - 1; $ix >= WANT_PROCESSORS; $ix--)
pcntl_waitpid($processors[$ix]);
} elseif(count($processors) < WANT_PROCESSORS) {
for($ix = count($processors); $ix < WANT_PROCESSORS; $ix++)
spawn_processor();
}
}
spawn_processors();
while($run) {
$cycles++;
if($reload) {
$reload = false;
kill_processors();
spawn_processors();
} else {
check_processors();
}
usleep(150000);
}
kill_processors();
pcntl_wait();
?>
听起来您已经在 *nix 系统上启动并运行了一个 MQ,只是想要一种管理工作人员的方法。
一个非常简单的方法是使用 GNU screen。要启动 10 个工作人员,您可以使用:
#!/bin/sh
for x in `seq 1 10` ; do
screen -dmS worker_$x php /path/to/script.php worker$x
end
这将使用名为 worker_1、2、3 等的屏幕在后台启动 10 个工作人员。
您可以通过运行 screen -r worker_ 重新连接到屏幕,并使用 screen -list 列出正在运行的工作人员。
有关更多信息,本指南可能会有所帮助: http ://www.kuro5hin.org/story/2004/3/9/16838/14935
也试试:
对于生产服务器,我通常建议使用正常的系统启动脚本,但多年来我一直在从启动脚本运行屏幕命令,没有任何问题。
PHP 的 pcntl 插件类型服务器守护进程
你真的需要它持续运行吗?
如果您只想根据请求生成新进程,则可以将其注册为 xinetd 中的服务。
Bellow 是我们对@chaos answer 的工作实现。处理信号的代码被删除,因为这个脚本通常只有几毫秒。
此外,在代码中,我们添加了 2 个函数来在调用之间保存 pid:restore_processors_state() 和 save_processors_state()。我们在redis
那里使用过,但您可以决定对文件使用实现。
我们使用 cron 每分钟运行一次这个脚本。Cron 检查所有进程是否还活着。如果不是 - 它重新运行它们然后死亡。如果我们想杀死现有的进程,那么我们只需使用参数运行这个脚本kill
:php script.php kill
。
无需将脚本注入 init.d 即可运行工作程序的非常方便的方法。
<?php
include_once dirname( __FILE__ ) . '/path/to/bootstrap.php';
define('WANT_PROCESSORS', 5);
define('PROCESSOR_EXECUTABLE', '' . dirname(__FILE__) . '/path/to/worker.php');
set_time_limit(0);
$run = true;
$reload = false;
declare(ticks = 30);
function restore_processors_state()
{
global $processors;
$redis = Zend_Registry::get('redis');
$pids = $redis->hget('worker_procs', 'pids');
if( !$pids )
{
$processors = array();
}
else
{
$processors = json_decode($pids, true);
}
}
function save_processors_state()
{
global $processors;
$redis = Zend_Registry::get('redis');
$redis->hset('worker_procs', 'pids', json_encode($processors));
}
function spawn_processor() {
$pid = pcntl_fork();
if($pid) {
global $processors;
$processors[] = $pid;
} else {
if(posix_setsid() == -1)
die("Forked process could not detach from terminal\n");
fclose(STDIN);
fclose(STDOUT);
fclose(STDERR);
pcntl_exec('/usr/bin/php', array(PROCESSOR_EXECUTABLE));
die('Failed to fork ' . PROCESSOR_EXECUTABLE . "\n");
}
}
function spawn_processors() {
restore_processors_state();
check_processors();
save_processors_state();
}
function kill_processors() {
global $processors;
foreach($processors as $processor)
posix_kill($processor, SIGTERM);
foreach($processors as $processor)
pcntl_waitpid($processor, $trash);
unset($processors);
}
function check_processors() {
global $processors;
$valid = array();
foreach($processors as $processor) {
pcntl_waitpid($processor, $status, WNOHANG);
if(posix_getsid($processor))
$valid[] = $processor;
}
$processors = $valid;
if(count($processors) > WANT_PROCESSORS) {
for($ix = count($processors) - 1; $ix >= WANT_PROCESSORS; $ix--)
posix_kill($processors[$ix], SIGTERM);
for($ix = count($processors) - 1; $ix >= WANT_PROCESSORS; $ix--)
pcntl_waitpid($processors[$ix], $trash);
}
elseif(count($processors) < WANT_PROCESSORS) {
for($ix = count($processors); $ix < WANT_PROCESSORS; $ix++)
spawn_processor();
}
}
if( isset($argv) && count($argv) > 1 ) {
if( $argv[1] == 'kill' ) {
restore_processors_state();
kill_processors();
save_processors_state();
exit(0);
}
}
spawn_processors();