12

我最近开始使用 Sidekiq,并注意到它有一个我一直在寻找的很棒的功能:

UserMailer.delay_until(5.days.from_now).find_more_friends_email

基本上我可以在未来安排一份工作,所以我不需要我的应用程序不断地轮询具有开始时间的新事件。

现在这就像一个魅力,但我如何更改工作的开始时间?通常,某些预定事件的开始时间会发生变化。我如何在 sidekiq 中复制它?

我知道我可以删除作业并创建一个新作业,但是否可以只修改开始时间?

编辑

我建立在 Oto Brglez 的想法之上,这是记录在案的代码:

module TaskStuff
  class TaskSetter
    include Sidekiq::Worker
    sidekiq_options retry: false

    def perform(task_id)
      task = Task.find(task_id)
      # Get the worker that's performing this job
      if worker = AppHelpers.find_worker(jid)
        # If the worker matches the "at" timestamp then this is the right one and we should execute it
        if worker.last["payload"]["at"].to_s.match(/(\d*\.\d{0,3})\d*/)[1] == task.start.to_f.to_s.match(/(\d*\.\d{0,3})\d*/)[1]
          task.execute
        else
          custom_logger.debug("This worker is the wrong one. Skipping...")
        end
      else
        custom_logger.error("We couldn't find the worker")
      end
    end
  end
end


module AppHelpers

  [...]

  def self.find_worker(jid)
    Sidekiq::Workers.new.select {|e| e.last["payload"]["jid"] == jid}.first
  end

  [...]

end

> task = Task.create(start: 5.hours.from_now)
> TaskStuff::TastSetter.perform_at(task.start, task.id)

现在如果我这样做

> task.update_attributes(start: 4.hours.from_now)
> TaskStuff::TastSetter.perform_at(task.start, task.id)

该任务将在 4 小时内执行,而另一个作业(将在 5 小时内执行)将在到达时间时被忽略并删除。

最初我尝试使用Time.now而不是,worker.last["payload"]["at"]但这可能非常不准确,因为计划的作业不会总是按时执行。检查间隔为 15 秒,如果所有工作人员都在其他地方忙,则作业可能会进一步延迟。

我不得不使用它Regexp来匹配开始时间,因为在阅读时task.start我可能会得到一个小数位数不同的浮点数,并且 if 条件不会通过。这样,我将两个值都设为小数点后 3 位。

我发现获得工作的“at”属性的唯一方法是通过工人获得它。如果我要问 Redis 或使用 Mike's Sidekiq::ScheduledSet.new,我不会得到当前的工作,因为它已经从 Redis 中撤出。

编辑 2

对于任何感兴趣的人,我采用了类似但不同的方法。基本上,我没有比较 的开始时间,而是Task在模型和 Sidekiq 调用中添加了一个额外的字段,称为start_token. 如果使用与对象相同的令牌调用了 sidekiq 作业,则它是有效的,否则丢弃并跳过该作业。每次模型更改 start_time 时,令牌都会更新。

4

3 回答 3

45

这可能没有回答这个问题,但这个问题首先出现在谷歌上,用于“sidekiq 增加已安排工作的时间”。

Sidekiq 在 SortedEntry 作业上有这个方法(reschedule)。

找工作:

job = Sidekiq::ScheduledSet.new.find_job(job_id)

重新安排工作:

job.reschedule(Time.now + 2.hours)
于 2015-03-06T14:05:50.657 回答
5

我不认为编辑/更新工作是方式。每次发生变化时,我通常都会创建新工作。然后在作业本身中检查执行某些任务的时间是否正确...如果时间正确,请继续执行任务,否则跳过...

于 2013-04-16T08:51:52.697 回答
0

您还可以通过使用 job.reschedule() 方法来做一件事,另一种方法是删除您的第一个计划作业,然后使用新时间再次添加。

代码:

  bugs = @user.bugs
  queue = Sidekiq::ScheduledSet.new
  queue.each do |job|
    job.delete if bugs.ids.include? job.args.first
  end

这将删除与用户错误 ID 匹配的所有作业

于 2018-10-01T10:47:12.403 回答