7

我有一个向远程服务发送请求的队列。有时这项服务会进行维护。当遇到这种情况时,我希望所有队列任务在 10 分钟内暂停并重试。我该如何实施?

4

2 回答 2

7

您可以使用Queue::looping()事件侦听器暂停整个队列或连接(而不仅仅是单个作业类)。与其他方法不同,这不会在队列暂停时将每个作业置于 pop/requeue 循环中,这意味着尝试次数不会增加

这是文档所说的:

使用外观looping上的方法Queue,您可以指定在工作人员尝试从队列中获取作业之前执行的回调。

https://laravel.com/docs/5.8/queues#job-events

没有很好地记录的是,如果回调返回,false那么工作人员将不会获取另一个作业。例如,这将阻止default队列运行:

Queue::looping(function (\Illuminate\Queue\Events\Looping $event) {
    // $event->connectionName (e.g. "database")
    // $event->queue (e.g. "default")

    if ($event->queue == 'default') {
        return false;
    }
});

注意:queue事件的属性将包含工作进程启动时来自命令行的值,因此如果您的工作人员正在检查多个队列(例如artisan queue:work --queue=high,default),那么queue事件中的值将是'high,default'。作为预防措施,您可能希望用逗号分解字符串并检查是否default在列表中。

因此,例如,如果您想创建一个基本的断路器以在您的邮件服务返回维护错误时暂停mail队列,那么您可以在EventServiceProvider.php中注册一个这样的监听器:

/**
 * Register any events for your application.
 *
 * @return void
 */
public function boot()
{
    parent::boot();

    Queue::looping(function (\Illuminate\Queue\Events\Looping $event) {
        if (($event->queue == 'mail') && (cache()->get('mail-queue-paused'))) {
            return false;
        }
    });
}

这假设您在应用程序的其他地方有一种机制来检测适当的情况,并且在此示例中,该机制需要为mail-queue-paused共享缓存中的键分配一个值(因为这是我的代码正在检查的内容)。有更强大的解决方案,但是在缓存中设置一个特定的众所周知的键(并自动过期)很简单,并且可以达到预期的效果。

于 2019-02-26T02:47:24.247 回答
2
<?php

namespace App\Jobs;

use ...

class SendRequest implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    const REMOTE_SERVER_UNAVAILABLE = 'remote_server_unavailable';

    private $msg;
    private $retryAfter;

    public function __construct($msg)
    {
        $this->msg = $msg;
        $this->retryAfter = 10;
    }

    /**
    * Execute the job.
    *
    * @return void
    */
    public function handle(){
        try {
            // if we have tried sending the request and get a RemoteServerException, we will
            // redispatch the job directly and return.
            if(Cache::get(self::REMOTE_SERVER_UNAVAILABLE)) {
                self::dispatch($this->msg)->delay(Carbon::now()->addMinutes($this->retryAfter));
                return;                  
            }
            // send request to remote server
            // ...
        } catch (RemoteServerException $e) {
            // set a cache value expires in 10 mins if not exists.
            Cache::add(self::REMOTE_SERVER_UNAVAILABLE,'1', $this->retryAfter);
            // if the remote service undergoes a maintenance, redispatch a new delayed job.
            self::dispatch($this->msg)->delay(Carbon::now()->addMinutes($this->retryAfter));            
        }
    }
}
于 2017-09-10T13:21:21.640 回答