10

我有一个后台作业,在 MongoDB 上执行 map/reduce 作业。当用户向文档发送更多数据时,它会启动在文档上运行的后台作业。如果用户发送多个请求,它将为同一个文档启动多个后台作业,但只有一个真正需要运行。有没有办法可以防止多个重复实例?我正在考虑为每个文档创建一个队列,并在提交新作业之前确保它是空的。或者也许我可以以某种方式设置一个与我的文档 ID 相同的作业 ID,并在提交之前检查是否不存在?

另外,我刚刚发现了一个 sidekiq-unique-jobs gem。但是文档不存在。这是我想要的吗?

4

4 回答 4

12

我最初的建议是这个特定工作的互斥锁。但是由于您可能有多个应用程序服务器在处理 sidekiq 作业,因此我建议在 redis 级别进行一些操作。

例如,在你的 sidekiq worker 定义中 使用redis-semaphore 。一个未经测试的例子

def perform
  s = Redis::Semaphore.new(:map_reduce_semaphore, connection: "localhost")

  # verify that this sidekiq worker is the first to reach this semaphore.
  unless s.locked?

    # auto-unlocks in 90 seconds. set to what is reasonable for your worker.
    s.lock(90)
    your_map_reduce()
    s.unlock
  end
end

def your_map_reduce
  # ...
end
于 2013-02-05T18:27:45.300 回答
6

https://github.com/krasnoukhov/sidekiq-middleware

UniqueJobs 为作业提供唯一性。

用法

示例工人:

class UniqueWorker
  include Sidekiq::Worker

  sidekiq_options({
    # Should be set to true (enables uniqueness for async jobs)
    # or :all (enables uniqueness for both async and scheduled jobs)
    unique: :all,

    # Unique expiration (optional, default is 30 minutes)
    # For scheduled jobs calculates automatically based on schedule time and expiration period
    expiration: 24 * 60 * 60
  })

  def perform
    # Your code goes here
  end
end
于 2013-05-31T18:04:07.157 回答
2

还有https://github.com/mhenrixon/sidekiq-unique-jobs (SidekiqUniqueJobs)。

于 2015-02-24T16:20:33.413 回答
0

您可以这样做,假设您已将所有作业添加到 Enqueued 存储桶。

class SidekiqUniqChecker
  def self.perform_unique_async(action, model_name, id)
    key = "#{action}:#{model_name}:#{id}"
    queue = Sidekiq::Queue.new('elasticsearch')
    queue.each { |q| return if q.args.join(':') == key }
    Indexer.perform_async(action, model_name, id)
  end
end

上面的代码只是一个示例,但您可以根据需要对其进行调整。

资源

于 2019-01-12T04:58:21.523 回答