10

我遇到了在多台服务器上运行的齿轮工工作人员的问题,我似乎无法解决。

当工作服务器脱机而不是取消工作进程时会出现此问题,并导致所有其他工作进程出错和失败。

只有 1 个客户和 2 个工作人员的示例 -

客户:

$client = new GearmanClient ();

$client->addServer ('192.168.1.200');
$client->addServer ('192.168.1.201');

$job = $client->do ('generate_tile', serialize ($arrData));

工人:

$worker = new GearmanWorker ();

$worker->addServer ('192.168.1.200');
$worker->addServer ('192.168.1.201');

$worker->addFunction ('generate_tile', 'generate_tile');

while (1)
{
    if (!$worker->work ())
    {

        switch ($worker->returnCode ())
        {

            default:
                echo "Error: " . $worker->returnCode () . ': ' . $worker->error () . "\n";
                break;

        }

    }
}

function generate_tile ($job) { ... }

工作代码在 2 个独立的服务器上运行。当每台服务器都启动并运行时,两个工作人员都按预期执行作业。当其中一个工作进程被取消时,另一个工作人员按预期执行所有作业。

但是,当带有已取消工作进程的服务器关闭并完全脱机时,对客户端脚本的请求会挂起,并且剩余的工作进程不会执行任何作业。

我从剩余的工作进程中得到以下一组错误:

Error: 46: gearman_con_wait:timeout reached
Error: 46: gearman_con_wait:timeout reached
Error: 4: gearman_con_flush:write:110
Error: 46: gearman_con_wait:timeout reached
Error: 4: gearman_con_flush:write:113
Error: 4: gearman_con_flush:write:113
Error: 4: gearman_con_flush:write:113
....

当我启动另一台服务器,而不是在其上启动工作进程时,剩余的工作进程会立即启动并执行任何剩余的作业。

我似乎很清楚,我需要工作进程中的一些代码来处理任何可能离线的服务器,但是我看不到如何做到这一点。

非常感谢,

安迪

4

5 回答 5

6

我们对多个 gearman 服务器的测试表明,如果列表中的最后一个服务器(在您的情况下为 192.168.1.201)被关闭,工作人员将停止按照您描述的方式执行。(此外,工作人员从最后一个服务器获取工作。只有在 .201 上没有工作时,他们才会在 .200 上处理工作)。

看来这是gearman服务器中链表的错误,据报道已多次修复,但对于所有可用版本的gearman,该错误仍然存​​在。抱歉,我知道这不是解决方案,但我们遇到了同样的问题,但没有找到解决方案。(如果有人可以为这个问题提供可行的解决方案,我同意给予大额赏金)

于 2011-08-16T10:16:36.380 回答
4

除了上面@Darhazer 的评论。我们也发现了这一点并像这样解决了:-

// Gearman workers show a strong preference for servers at the end of a list so randomize the order
$worker = new GearmanWorker();
$s2 = explode(",", Configure::read('workers.servers'));
shuffle($s2);
$servers = implode(",", $s2);
$worker->addServers($servers); 

我们随时运行 6 到 10 个工作人员,并在他们完成 x 个请求后使他们过期。

于 2012-05-28T23:43:04.950 回答
2

我使用这个类,它跟踪哪些作业在哪些服务器上工作。还没有完全测试,现在才写。我已经粘贴了一个经过编辑的版本,因此可能存在拼写错误或类似问题,但似乎可以解决问题。

<?
class MyGearmanClient {
        static $server = "server1,server2,server3";
        static $server_array = false;
        static $workingServers = false;
        static $gmclient = false;
        static $timeout = 5000;
        static $defaultTimeout = 5000;

        static function randomServer() {
                return self::$server_array[rand(0, count(self::$server_array) -1)];
        }

        static function getServer($job = false) {
                if (self::$server_array == false) {
                        self::$server_array = explode(",", self::$server);
                        self::$workingServers = array();
                }

                $serverList = array();
                if ($job) {
                        if (array_key_exists($job, self::$workingServers)) {
                                foreach (self::$server_array as $server) {
                                        if (array_key_exists($server, self::$workingServers[$job])) {
                                                if (self::$workingServers[$job][$server]) {
                                                        $serverList[] = $server;
                                                }
                                        } else {
                                                $serverList[] = $server;
                                        }
                                }
                                if (count($serverList) == 0) {
                                        # All servers have failed, need to insert all the servers again and retry.
                                        $serverList = self::$workingServers[$job] = self::$server_array;
                                }
                                return $serverList[rand(0, count($serverList) - 1)];
                        } else {
                                return self::randomServer();
                        }
                } else {
                        return self::randomServer();
                }
        }

        static function serverWorked($server, $job) {
                self::$workingServers[$job][$server] = $server;
        }

        static function serverFailed($server, $job) {
                self::$workingServers[$job][$server] = false;
        }

        static function Connect($server = false, $job = false) {
                if ($server) {
                        self::$server = self::getServer();
                }

                self::$gmclient= new GearmanClient();
                self::$gmclient->setTimeout(self::$timeout);

                # add the default job server
                self::$gmclient->addServer($server = self::getServer($job));

                return $server;
        }

        static function Destroy() {
                self::$gmclient = false;
        }

        static function Client($name, $vars, $timeout = false) {
                if (is_int($timeout)) {
                        self::$timeout = $timeout;
                } else {
                        self::$timeout = self::$defaultTimeout;
                }


                do {
                        $server = self::Connect(false, $name);
                        $value = self::$gmclient->do($name, $vars);
                        $return_code = self::$gmclient->returnCode();
                        if (!$value) {
                                $error_message = self::$gmclient->error();
                                if ($return_code == 47) {
                                        self::serverFailed($server, $name);
                                        if (count(self::$server_array) > 1) {
                                             // ADDED SINGLE SERVER LOOP AVOIDANCE // echo "Timeout on server $server, trying another server...\n";
                                             continue;
                                        } else {
                                             return false;
                                        }
                                }
                                echo "ERR: $error_message ($return_code)\n";
                        }
                        # printf("Worker has returned\n");
                        $short_value = substr($value, 0, 80);
                        switch ($return_code)
                        {
                        case GEARMAN_WORK_DATA:
                                echo "DATA: $short_value\n";
                                break;
                        case GEARMAN_SUCCESS:
                                self::serverWorked($server, $name);
                                break;
                        case GEARMAN_WORK_STATUS:
                                list($numerator, $denominator)= self::$gmclient->doStatus();
                                echo "Status: $numerator/$denominator\n";
                                break;
                        case GEARMAN_TIMEOUT:
                                // self::Connect();
                                // Fall through
                        default:
                                echo "ERR: $error_message " . self::$gmclient->error() . " ($return_code)\n";
                                break;
                        }
                }
                while($return_code != GEARMAN_SUCCESS);

                $rv = unserialize($value);
                return $rv["rv"];
        }
}

# Example usage:
#    $rv = MyGearmanClient::Client("Function", $args);

?>
于 2011-08-17T09:04:01.890 回答
0

由于来自 gearman 客户端的“addServer”无法正常工作,此代码可以随机选择一个作业服务器,如果失败尝试下一个,这样您就可以平衡负载。

        // job servers
        $jobservers = array('192.168.1.1','192.168.1.2');
        // prepare gearman client
        $gmclient = new GearmanClient();
        // shuffle job servers (deliver jobs equally by server)
        shuffle($jobservers);
        // add job servers
        foreach($jobservers as $jobserver) {
            // add random jobserver
            $gmclient->addServer($jobserver);
            // check server state if ok end foreach
            if (@$gmclient->ping('ping')) break;
            // if connections fails reset client
            $gmclient = new GearmanClient();
        }
于 2017-05-19T15:17:18.600 回答
0

解决方案经过测试并且工作正常。

     $client = new GearmanClient();
     if(!$client->addServer("11.11.65.73",4730))
        $client->addServer("11.11.65.79",4730);
于 2020-12-02T16:43:42.373 回答