我们有多个任务要在同一个 api 上运行,因为 api 是速率限制的,我们需要限制调用的次数(100 x 120 秒),这样我们就不会被锁定。每个锁定持续 30 分钟,会冻结整个团队并可能产生可怕的问题。
系统建立在 Laravel 之上,我们使用队列来进行这个 api 调用。我们需要的第一项工作是,系统每晚都会对 api 进行大约 950 次调用,以将 api 上的内容与我们的数据库同步。第二个和第三个作业将是按需的,它可能需要在用户需要时分别运行大约 100 到 200 个调用。(多个用户可能在一天中的同一时间需要这个,所以队列应该防止我们超过速率限制)。
我们在主管中设置了所有内容,只有一名工人,而且似乎工作正常。但后来我们意识到它并没有按需调用所有作业(第 2 和第 3 类),如果用户需要对 8 件事情采取行动,他们会出现在队列中,但只运行 2 或 3。其余的会得到永远不要拿起或不运行,甚至没有失败。通过将工人数量增加到 8 个,这在一定程度上得到了解决。在我们这样做之后,如果用户需要对 8 个工作采取行动,但我们每晚的 900 个任务开始失败。
我们还注意到它可以处理 8 个工作,但如果我们让用户移动多达 50 个任务,它也会失败。
我们在 /etc/supervisor/conf.d 上的主管配置:
[unix_http_server]
file = /tmp/supervisor.sock
chmod = 0777
chown= ubuntu:ubuntu
[supervisord]
logfile = /home/ubuntu/otmas/supervisord.log
logfile_maxbytes = 50MB
logfile_backups=10
loglevel = info
pidfile = /tmp/supervisord.pid
nodaemon = false
minfds = 1024
minprocs = 200
umask = 022
user = ubuntu
identifier = supervisor
directory = /tmp
nocleanup = true
childlogdir = /tmp
strip_ansi = false
[rpcinterface:supervisor]
supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface
[supervisorctl]
serverurl = unix:///tmp/supervisor.sock
[program:laravel-worker]
process_name=%(program_name)s_%(process_num)02d
command=php /home/ubuntu/www/artisan horizon
autostart=true
autorestart=true
user=ubuntu
numprocs=8
redirect_stderr=true
stdout_logfile=/home/ubuntu/otmas/worker.log
我们的队列配置:
'connections' => [
'redis' => [
'driver' => 'redis',
'connection' => 'default',
'queue' => 'default',
'retry_after' => 460,
'block_for' => 140,
],
],
我们的地平线配置:
'environments' => [
'production' => [
'supervisor-1' => [
'connection' => 'redis',
'queue' => ['default'],
'balance' => 'simple',
'processes' => 10,
'tries' => 3,
],
],
'local' => [
'supervisor-1' => [
'connection' => 'redis',
'queue' => ['default'],
'balance' => 'simple',
'processes' => 3,
'tries' => 3,
],
],
'develop' => [
'supervisor-1' => [
'connection' => 'redis',
'queue' => ['default','email'],
'balance' => 'auto',
'processes' => 3,
'tries' => 3,
'timeout' => 3,
'delay' => 3,
],
],
],
我们在 database.php 上的 redis 配置:
'redis' => [
'client' => 'predis',
'default' => [
'host' => env('REDIS_HOST', '127.0.0.1'),
'password' => env('REDIS_PASSWORD', null),
'port' => env('REDIS_PORT', 6379),
'database' => env('REDIS_DB', 0),
'read_write_timeout' => -1,
],
'cache' => [
'host' => env('REDIS_HOST', '127.0.0.1'),
'password' => env('REDIS_PASSWORD', null),
'port' => env('REDIS_PORT', 6379),
'database' => env('REDIS_CACHE_DB', 1),
'read_write_timeout' => -1,
],
],
夜间作业:
class GetProjectTasks implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
protected $project;
public $tries = 540;
public $timeout = 15;
public function __construct(ZohoProject $project)
{
$this->project = $project;
}
public function handle()
{
\Log::info("before redis get tasks for project: ".$this->project->id);
try {
$client = new GuzzleClient();
$accessToken = Setting::where('name','zoho_access_token')->get()->first();
$requestHeaders = [
'cache-control' => 'no-cache',
'Authorization' => 'Bearer '.$accessToken->value
];
$requestOptions = [
'headers' => $requestHeaders
];
$zohoTasksRestApi = Setting::where('name','zoho_projectapi_restapi')->get()->first()->value;
$project = $this->project->id;
} catch(Exception $e) {
Log::notice('Exception caught');
throw $e;
}
Redis::throttle('zohoprojecttasksget')->allow(85)->every(120)->then(function () use ($client, $zohoTasksRestApi, $portalId, $project, $requestOptions) {
try {
$zohoTasksResponse = $client->request('GET', $zohoTasksRestApi.'portal/'.$portalId.'/projects/'.$project.'/tasks/', $requestOptions);
$zohoTasksResult = json_decode((string) $zohoTasksResponse->getBody(), true);
if($zohoTasksResult) {
foreach ($zohoTasksResult['tasks'] as $key => $task) {
if (array_has($task, 'start_date') && array_has($task, 'end_date')) {
$taskStartDate = \Carbon\Carbon::createFromTimestamp((int)$task['start_date_long']/1000, 'America/Santiago');
$taskEndDate = \Carbon\Carbon::createFromTimestamp((int)$task['end_date_long']/1000, 'America/Santiago');
$zohoTask = ZohoTask::updateOrCreate(['id' => $task['id']], [
'name' => $task['name'],
'zoho_project_id' => $project,
'registry' => json_encode($task),
'start_date' => $taskStartDate,
'end_date' => $taskEndDate
]);
if(array_has($task, ['details.owners'])) {
if($task['details']['owners'][0]['name'] != 'Unassigned') {
foreach ($task['details']['owners'] as $key => $owner) {
$user = ZohoUser::find($owner['id']);
$zohoTask->assignees()->save($user);
}
}
}
} else {
$zohoTask = ZohoTask::updateOrCreate(['id' => $task['id']], [
'name' => $task['name'],
'zoho_project_id' => $project,
'registry' => json_encode($task),
]);
}
}
}
} catch(Exception $e) {
Log::notice('Exception caught');
throw $e;
}
}, function () use ($project) {
return $this->release(85);
});
}
}
按需作业示例:
class ReplaceTask implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
protected $task, $taskToReplace;
public $tries = 140;
public $timeout = 15;
public function __construct(ZohoTask $task, ZohoTask $taskToReplace)
{
$this->task = $task;
$this->taskToReplace = $taskToReplace;
}
public function handle()
{
\Log::info("before redis replace task: ".$this->taskToReplace.' with task:: '.$this->task);
try {
$client = new GuzzleClient();
$accessToken = Setting::where('name','zoho_access_token')->get()->first();
$requestHeaders = [
'cache-control' => 'no-cache',
'Authorization' => 'Bearer '.$accessToken->value
];
$ZohoProjectsRestApi = Setting::where('name','zoho_projectapi_restapi')->get()->first()->value;
$task = $this->task;
$taskToReplace = $this->taskToReplace;
$project = $task->project;
} catch(Exception $e) {
Log::notice('Exception caught');
throw $e;
}
\Log::info("before redis task: ".$task->id.' to replace '.$taskToReplace->id.' Project: '.$project->id);
Redis::throttle('zohoupdatetask')->allow(50)->every(120)->then(function () use ($client, $ZohoProjectsRestApi, $portalId, $project, $requestHeaders, $task, $taskToReplace) {
try {
$assignee = $taskToReplace->assignees->last();
$assignResponse = $client->request('POST', $ZohoProjectsRestApi.'portal/'.$portalId.'/projects/'.$project->id.'/tasks/'.$task->id.'/', [
'headers' => $requestHeaders,
'form_params' => [
'start_date' => $taskToReplace->start_date->format('m-d-Y'),
'start_time' => $taskToReplace->start_date->format('h:i A'),
'person_responsible' => $assignee->id
]
]);
\Log::info($assignResponse->getBody());
if($assignResponse->getStatusCode() == 200) {
$task->assignees()->detach();
$assignResult = json_decode((string) $assignResponse->getBody(), true);
$taskRegistry = $assignResult['tasks'][0];
$taskStartDate = \Carbon\Carbon::createFromTimestamp((int)$taskRegistry['start_date_long']/1000, 'America/Santiago');
$taskEndDate = \Carbon\Carbon::createFromTimestamp((int)$taskRegistry['end_date_long']/1000, 'America/Santiago');
$task->start_date = $taskStartDate;
$task->end_date = $taskEndDate;
$task->registry = json_encode($taskRegistry);
$task->assignees()->save($assignee);
$task->save();
\Log::info("Task Ok move");
}
} catch(Exception $e) {
Log::notice('Exception caught');
throw $e;
}
}, function () use ($project, $task, $taskToReplace) {
\Log::info("Task hit throttle");
return $this->release(120);
});
\Log::info("after redis");
}
}
地平线任务:(被搁置的任务永远不会运行或拿起或任何东西) 任务出现在队列中,但似乎永远不会运行或失败