2

我有一个 ActiveJob,它应该通过 HTTP 从外部系统加载一段数据。当该作业完成时,我想将第二个作业排入队列,该作业进行一些后处理,然后将数据提交到不同的外部系统。

我不想让第一份工作知道第二份工作,因为

  1. 封装
  2. 可重用性
  3. 基本上不关第一份工作的事

同样,我不希望第一份工作关心如果数据加载失败接下来会发生什么——也许用户会收到通知,也许我们在超时后重试,也许我们只是记录它并举手——再一次可能会根据异常的详细信息而有所不同,并且作业不需要包含该异常的逻辑或与其他系统的连接来处理它。

在 Java 中(这是我最有经验的地方),我可以使用 Guava 的ListenableFuture之类的东西在事后添加成功和失败回调:

MyDataLoader loader = new MyDataLoader(someDataSource)
ListenableFuture<Data> future = executor.submit(loader);
Futures.addCallback(future, new FutureCallback<Data>() {
    public void onSuccess(Data result) {
        processData(result);
    }
    public void onFailure(Throwable t) {
        handleFailure(t);
    }
});

不过,ActiveJob 似乎并没有提供这种外部回调机制——我可以从“Active Job Basics”中的相关 部分after_perform了解到,并且rescue_from只能从作业类中调用。after_peform并不意味着区分成功和失败。

所以我能想到的最好的方法(我并不是说它非常好)是将几个 lambdas 传递给工作的perform方法,因此:

class MyRecordLoader < ActiveJob::Base

  # Loads data expensively (hopefully on a background queue) and passes
  # the result, or any exception, to the appropriate specified lambda.
  #
  # @param data_source [String] the URL to load data from
  # @param on_success [-> (String)] A lambda that will be passed the record
  #   data, if it's loaded successfully
  # @param on_failure [-> (Exception)] A lambda that will be passed any
  #   exception, if there is one
  def perform(data_source, on_success, on_failure)
    begin
      result = load_data_expensively_from data_source
      on_success.call(result)
    rescue => exception
      on_failure.call(exception)
    end
  end

end

(旁注:我不知道将 lambdas 声明为参数的 yardoc 语法是什么。这看起来是否正确,或者,如果不这样做,是否合理?)

然后,调用者必须将这些传入:

MyRecordLoader.perform_later(
  some_data_source,
  method(:process_data),
  method(:handle_failure)
)

这并不可怕,至少在调用方,但它看起来很笨重,我不禁怀疑有一个我只是没有找到的常见模式。而且我有点担心,作为一个 Ruby/Rails 新手,我只是让 ActiveJob 去做一些它本来就不想做的事情。我发现的所有 ActiveJob 示例都是“一劳永逸”——异步“返回”结果似乎不是 ActiveJob 用例。

此外,我不清楚这是否适用于像 Resque 这样在单独进程中运行作业的后端。

执行此操作的“Ruby 方式”是什么?


更新:正如dre-hh 所暗示的那样,ActiveJob 在这里被证明不是正确的工具。它也是不可靠的,并且对于这种情况过于复杂。我改用Concurrent Ruby,它更适合用例,并且由于任务主要受 IO 限制,即使在 MRI 上也足够快,尽管有 GIL

4

1 回答 1

2

ActiveJob 不是一个像 future 或 promise 那样的异步库。

它只是一个用于在后台执行任务的界面。当前线程/进程没有收到此操作的结果。

例如当使用 Sidekiq 作为 ActiveJob 队列时,它会将 perform 方法的参数序列化到 redis 存储中。在您的 rails 应用程序上下文中运行的另一个守护进程将监视 redis 队列并使用序列化数据实例化您的工作者。

所以传递回调可能没问题,但是为什么将它们作为另一个类的方法。如果这些回调是动态的(在不同的调用上发生变化),则传递回调将是有意义的。但是,当您在调用类上实现它们时,请考虑将这些方法移到您的作业工作者类中。

于 2015-03-16T19:38:34.273 回答