2

请参阅下面的更新问题。

原始问题:

在我当前的 Rails 项目中,我需要解析大型 xml/csv 数据文件并将其保存到 mongodb。现在我使用以下步骤:

  1. 接收用户上传的文件,将数据存入mongodb
  2. 使用 sidekiq 对 mongodb 中的数据进行异步处理。
  3. 处理完成后,删除原始数据。

对于localhost中的中小数据,上述步骤运行良好。但是在heroku中,我使用hirefire来动态地上下缩放worker dyno。当工人仍在处理大数据时,hirefire 会看到空队列并缩小工人 dyno。这会向进程发送终止信号,并使进程处于未完成状态。

我正在寻找一种更好的解析方法,允许解析过程随时被终止(在接收到终止信号时保存当前状态),并允许进程重新排队。

现在我正在使用 Model.delay.parse_file 并且它不会重新排队。

更新

在阅读了 sidekiq wiki 之后,我找到了关于作业控制的文章。谁能解释代码,它是如何工作的,以及在接收到 SIGTERM 信号并且工作人员重新排队时如何保持其状态?

有没有其他方法可以处理工作终止、保存当前状态并从最后一个位置继续?

谢谢,

4

2 回答 2

6

可能更容易解释过程和高级步骤,给出一个示例实现(我使用的一个精简版本),然后讨论 throw 和 catch:

  1. 插入带有递增索引的原始 csv 行(以便以后能够从特定行/索引恢复)
  2. 处理停止每个“块”的 CSV,通过检查是否Sidekiq::Fetcher.done?返回 true来检查作业是否完成
  3. 当 fetcher 为done?时,将当前处理的 item 的索引存储在用户上并返回,以便将作业completes和控制返回给 sidekiq。
  4. 请注意,如果一个作业在短暂超时(默认 20 秒)后仍在运行,则该作业将被终止。
  5. 然后当作业再次简单运行时,从上次中断的位置(或 0)开始

例子:

    class UserCSVImportWorker
      include Sidekiq::Worker

      def perform(user_id)
        user = User.find(user_id)

        items = user.raw_csv_items.where(:index => {'$gte' => user.last_csv_index.to_i})
        items.each_with_index do |item, i|
          if (i+1 % 100) == 0 && Sidekiq::Fetcher.done?
            user.update(last_csv_index: item.index)

            return
          end

          # Process the item as normal
        end
      end
    end

上面的类确保每 100 个项目我们检查 fetcher 没有完成(如果关闭已启动的代理),并结束作业的执行。但是,在执行结束之前,我们会使用index已处理的最后一个用户更新用户,以便我们可以从下次中断的地方开始。

throw catch 是一种更简洁(也许)实现上述功能的方法,但有点像使用 Fibers,不错的概念但很难绕开你的脑袋。从技术上讲, throw catch 比大多数人通常习惯的更像 goto。

编辑

此外,您无法调用Sidekiq::Fetcher.done?并记录last_csv_index每一行或处理的每一行块,这样,如果您的工人在没有机会记录的情况下被杀,last_csv_index您仍然可以“接近”您离开的地方恢复。

于 2014-07-08T15:18:46.310 回答
3

您正在尝试解决幂等性的概念,即多次处理具有潜在不完整循环的事物不会导致问题的想法。(https://github.com/mperham/sidekiq/wiki/Best-Practices#2-make-your-jobs-idempotent-and-transactional

可能的前进步骤

  1. 将文件拆分为多个部分,并使用每个部分的作业处理这些部分。
  2. 提高hirefire的门槛,以便在工作可能完全完成(10分钟)时扩展
  3. 工作正在运行时,不允许hirefire缩小(在开始时设置redis键并在完成时清除)
  4. 跟踪正在处理的作业的进度,并在作业被终止时从您离开的地方继续。
于 2014-07-05T04:04:45.397 回答