6

我有一个从多个 IMAP 帐户获取大量电子邮件的 rails 应用程序。

  • 我使用sidekiq来处理这些工作。
  • 我使用sidetiq来安排作业。
  • 我使用redis-semaphore来确保同一用户的重复作业不会相互绊倒。

2个问题:

  • 1:当一个作业点击“if s.lock”时, redis-semaphore将其暂停,直到所有之前的作业都完成。我需要取消作业而不是排队。
  • 2:如果在作业过程中引发异常,导致崩溃,sidekiq会将作业放回队列中进行重试。我需要取消作业而不是排队。将“sidekiq_options :retry => false”放入代码中似乎没有什么不同。

我的代码:

class FetchMailsJobs
  include Sidekiq::Worker
  include Sidetiq::Schedulable

  tiq { hourly.minute_of_hour(0, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55) }

  def perform(last_occurrence, current_occurrence)
    users = User.all
    users.each do |user|

      if user.imap_accounts.exists?
        ImapJob.perform_async(user._id.to_s)
      end
    end
  end
end

class ImapJob
  include Sidekiq::Worker

  def perform(user_id)
    s = Redis::Semaphore.new("fetch_imap_mails_for_#{user_id}".to_sym, connection: "localhost")
    if s.lock
      user = User.where(_id: user_id).first
      emails = ImapMails.receive_mails(user)
      s.unlock
    end
  end
end
4

2 回答 2

6

1. 创建一个Redis子类和重载以blpop接受.-1lpop

redis- semaphore调用@redis.blpop. Redis::Semaphore#lock虽然您可以重载lock要使用的方法@redis.lpop,但更简单的方法是将自定义实例传递Redis给信号量。

将以下内容放在lib您的 rails 应用程序中,并在您的config/initializers/sidekiq.rb(或做任何您喜欢加载以下类的偏好)中使用它。

class NonBlockingRedis < Redis
  def blpop(key, timeout)
    if timeout == -1
      result = lpop(key)
      return result if result.nil?
      return [key, result]
    else
      super(key, timeout)
    end
  end
end

每当您调用Redis::Semaphore.new时,传递一个:redis带有该类的新实例的键NonBlockingRedis

调用s.lockwith-1作为参数来使用lpop而不是blpop.

s = Redis::Semaphore.new("fetch_imap_mails_for_#{user_id}".to_sym, redis: NonBlockingRedis.new(connection: "localhost"))
if s.lock -1
  user = User.where(_id: user_id).first
  emails = ImapMails.receive_mails(user)
  s.unlock
end

2.sidekiq_options retry: false在您的工人阶级中使用应该可以工作,请参见下面的示例。

在您的问题中,您没有指定在重试队列中结束作业时遇到问题的工作人员。由于FetchMailsJobs最终将ImapJob作业排队,因此前者中的异常可能会导致看起来ImapJob正在重新排队。

使用您的信号量锁,将您的工作包装在一个begin rescue ensure块中也是一个好主意。

class FetchMailsJobs
  include Sidekiq::Worker
  include Sidetiq::Schedulable

  sidekiq_options retry: false

  tiq { hourly.minute_of_hour(0, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55) }

  def perform(last_occurrence, current_occurrence)
    users = User.all
    users.each do |user|

      if user.imap_accounts.exists?
        ImapJob.perform_async(user._id.to_s)
      end
    end
  end
end

class ImapJob
  include Sidekiq::Worker

  sidekiq_options retry: false

  def perform(user_id)
    s = Redis::Semaphore.new("fetch_imap_mails_for_#{user_id}".to_sym, redis: NonBlockingRedis.new(connection: "localhost"))
    if s.lock - 1
      begin
        user = User.where(_id: user_id).first
        emails = ImapMails.receive_mails(user)
      rescue => e
        # ignore; do nothing
      ensure
        s.unlock
      end
    end
  end
end

有关更多信息,请参阅sidekiq 高级选项:workers

于 2013-05-05T05:47:00.883 回答
1

难道不能放弃使用redis-semaphore并使用sidekiq-unique-jobs gem吗?它似乎是一个包含精美的工具,可以完全满足您的需求。

于 2013-11-20T19:46:48.710 回答