1

sidekiq用来处理异步作业,在增加了一些复杂性之后,我很难了解作业的状态。

这是交易:

我有一个模型Batch在提交后调用异步方法:

# app/models/batch.rb
class Batch < ActiveRecord::Base

  after_commit :calculate, on: :create

  def calculate
    job_id = BatchWorker.perform_async(self.id)

    # update_column skips callbacks and validations!
    self.update_column(:job_id, job_id)
  end
end

Worker 从模型中读取数据,并为每个数据调用一个异步作业,如下所示:

# app/workers/batch_worker.rb
class BatchWorker

  def perform(batch_id)
    batch = Batch.find(batch_id)

    ## read data to 'tab'    

    tab.each do |ts|
      obj = batch.item.create(name: ts[0], data: ts[1])
      job_id = ItemWorker.perform_async(obj.id)
      obj.update_attribute(:job_id, job_id)
    end
  end

end

问题是:那些异步作业执行计算,我不能让下载结果链接在完成之前可用,所以我需要知道所有“子作业”何时完成,所以我可以statusBatch模型中更改一个属性. 换句话说,我不需要知道是否所有作业都已排队,而是需要知道由 生成的所有异步作业ItemWorker是否已执行,并且现在已完成。

  • 实现这一目标的最佳方法是什么?在“并行计算世界”中有意义吗?

Obs.:我不确定将 job_id 存储在 db 中,因为它似乎是易变的。

4

1 回答 1

4

也许为此使用 Redis 可能是一个不错的选择,因为您已经在基础架构中拥有它并在 Rails 应用程序中进行了配置(由于 Sidekiq)

Redis 有一个内置的发布/订阅引擎,以及对键的原子操作——使其适合管理您正在寻找的并发类型。

大概是这样的:

class BatchWorker

  def perform(batch_id)
    batch = Batch.find(batch_id)

    redis = Redis.new
    redis.set "jobs_remaining_#{batch_id}", tab.count
    redis.subscribe("batch_task_complete.#{batch_id}") do |on|
      on.message do |event, data|
        if redis.decr("jobs_remaining_#{batch_id}") < 1
          #UPDATE STATUS HERE
          redis.del "jobs_remaining_#{batch_id}"
        end
      end
    end

    tab.each do |ts|
      obj = batch.item.create(name: ts[0], data: ts[1])
      job_id = ItemWorker.perform_async(obj.id, batch_id)
    end
  end
end

class ItemWorker
  def perform item_id, batch_id=nil
    #DO STUFF
    if batch_id
      Redis.new.publish "batch_task_complete.#{batch_id}"
    end
  end
end
于 2013-03-11T22:29:16.973 回答