0

我正在运行一个需要处理一些“繁重工作”任务(从 30 秒到 3 分钟)的 webapp (php)。我知道它不是很重,但是我不能让我的用户等待他们,所以我设置了一个内部 API,例如:http://localhost/process-picture/745884/并将此操作存储在 MySQL 表中。

现在我想创建一个“进程”来获取那个 MySQL 表并执行最旧的排队操作,一旦完成,它就会得到下一个,依此类推。

首先,我想制作一个通过 cURL 调用系统的 PHP 脚本,如下所示:

fetchOperation.php连接到 DB 并获取操作的 URL 以通过 cURL 调用它。每个操作:执行自己,然后从队列中删除自己并fetchOperation.php再次调用 (cURL)。

我觉得这个系统有点棘手,所以我想知道是否有(以及用哪种语言编写)任何方式来设置每 15 秒检查一次数据库的后台进程,并执行以下操作:

  • 删除所有标记为 的行DONE
  • 检查是否有任何标记为 的行PROCESSING,如果是,则退出并等待接下来的 15 秒。
  • 如果没有PROCESSING行,则触发最旧的行PENDING(FIFO 队列)。

这样我可以随时管理正在处理的内容,甚至控制服务器负载(例如,晚上最多允许拥有三个PROCESSING项目)。

抱歉这么长的解释,提前谢谢!

4

2 回答 2

4

听起来您需要使用cron安排作业。cron 只会运行一个脚本/程序,因此它的实现将与调度本身不同。

Cron 只会触发/忘记,因此您可以从中调用任何持续时间的进程(我正在查看您在下面的执行时间评论 - 如果我误解了请修改)

于 2012-09-28T09:19:33.563 回答
0

最后我这样解决了:

第一个。以可通过 URL 调用的方式设计操作。例如:http://localhost/render-image/14523665

第二。将挂起的操作存储在如下表中:操作(操作 ID、操作 URL、操作状态)。

准备就绪后,我设计了一个简短的 PHP 脚本,它执行以下操作:

<?php
if( WORKER_IS_ONLINE !== true ) exit();
OperationsQueue::CleanQueue();
while( OperationsQueue::RunningOperations() < WORKER_MAX_OPERATIONS ) {
    launch_operation( OperationsQueue::DequeOperation() );
}
sleep( WORKER_SLEEP_TIME );
launch_operation( '/operations_queue/deque' );

这个辅助函数(launch_operationOperationsQueue类)执行如下(注意它尚未实现):

<?php 
define('WORKER_SLEEP_TIME', 15);     // Time to sleep between sweeps.
define('WORKER_HOST', 'localhost');  // Hostname where the operations will be performed.
define('WORKER_PORT', 80);       // Port where the operations will be performed.
define('WORKER_SOCKET_TIMEOUT', 80); // Port where the operations will be performed.
define('WORKER_MAX_OPERATIONS', 2);  // Max simultaneous operations.
define('WORKER_MAX_LAUNCHES', 2);    // Max operations launched within a sweep.
define('WORKER_IS_ONLINE', false);   // Bool value that marks whether the worker must be working or not.

function launch_operation($operation) {
    $fp = fsockopen(WORKER_HOST, WORKER_PORT, $errno, $errstr, WORKER_SOCKET_TIMEOUT);
    if ($fp) {
        $out  = 'GET ' . $operation . " HTTP/1.1\r\n";
        $out .= 'Host: ' . WORKER_HOST . "\r\n\r\n";
        fwrite($fp, $out);
        fclose($fp);
    }
}

class OperationsQueue {

    public static function RunningOperations(){
        // connect to DB an perform: SELECT COUNT(*) FROM operations WHERE status = 'PROCESSING';
        return 1;
    }
    public static function CleanQueue(){
        // connect to DB an perform: DELETE FROM operations WHERE status IN ('DONE', 'CANCELED');
    }
    public static function DequeOperation(){
        // connect to DB an perform: SELECT * FROM operations WHERE status = 'PENDING' ORDER BY id ASC LIMIT 0, 1;
        // UPDATE operations SET status = 'PROCESSING', tries = tries+1 WHERE id = $id;
        return 'operation_url';
    }

}

我认为它可能对其他人有用。如您所见,它链接了操作。

于 2012-09-28T14:45:18.173 回答