3

我正在使用 Rails 5。我有这个用于管理线程的宝石......

gem 'concurrent-ruby'

我注意到如果我的一个线程抛出一个错误,它就会被吞掉,我永远不会发现它。我在控制台中试过这个

pool = Concurrent::FixedThreadPool.new(1)
  # => #<Concurrent::FixedThreadPool:0x007fe3585ab368 @__lock__=#<Thread::Mutex:0x007fe3585ab0c0>, @__condition__=#<Thread::ConditionVariable:0x007fe3585ab098>, @min_length=1, @max_length=1, @idletime=60, @max_queue=0, @fallback_policy=:abort, @auto_terminate=true, @pool=[], @ready=[], @queue=[], @scheduled_task_count=0, @completed_task_count=0, @largest_length=0, @ruby_pid=23932, @gc_interval=30, @next_gc_time=252232.13299, @StopEvent=#<Concurrent::Event:0x007fe3585aaf30 @__lock__=#<Thread::Mutex:0x007fe3585aaeb8>, @__condition__=#<Thread::ConditionVariable:0x007fe3585aae90>, @set=false, @iteration=0>, @StoppedEvent=#<Concurrent::Event:0x007fe3585aadc8 @__lock__=#<Thread::Mutex:0x007fe3585aad78>, @__condition__=#<Thread::ConditionVariable:0x007fe3585aad50>, @set=false, @iteration=0>> 
nums.each do |num|
  pool.post do
    if num == 1
      asdfasdf
    end
  end
end
  # => [1, 2, 3] 
pool.shutdown             # => true 
pool.wait_for_termination # => true 

我想知道,如果我的池中的一个线程抛出错误,我可以在所有线程都完成后抛出异常,从而停止我的程序。如果没有任何线程抛出错误,那么我可以继续处理正在发生的事情。

在上面,你会注意到我故意导致了一个应该导致错误的条件,但我从来没有发现它,因为我猜线程池正在吞噬异常的输出。

4

2 回答 2

3

如果您需要内置的异常处理,您应该使用更高级别的抽象而不是直接使用线程池。请参阅以下作者的评论concurrent-ruby

大多数应用程序不应直接使用线程池。线程池是供内部使用的低级抽象。该库中的所有高级抽象(PromiseActor等)都将作业发布到全局线程池并都提供异常处理。只需选择最适合您的用例的抽象并使用它。

如果您觉得需要配置自己的线程池而不是使用全局线程池,您仍然可以使用高级抽象。它们都支持:executor允许您注入自定义线程池的选项。然后,您可以使用高级抽象提供的异常处理。

这是使用Promise抽象的示例的变体。一旦线程池引发异常,这将重新引发异常:

require 'concurrent'
pool = Concurrent::FixedThreadPool.new(1)
promises = (1..10).map do |num|
  Concurrent::Promise.execute(executor: pool) do
    if num == 1
      asdfasdf
    else
      num
    end
  end
end
promises.map(&:value!)

# NameError: undefined local variable or method `asdfasdf' for main:Object
#     from (irb):57:in `block (2 levels) in irb_binding'
#     [...]

要仅在所有线程完成后(而不是在第一个异常时立即)重新引发异常,您可以替换promises.map(&:value!)Concurrent::Promise.zip(*promises).value!.

要将异常存储在集合结果中而不重新引发它,您可以执行以下操作promises.map { |p| p.value || p.reason }

# => [#<NameError: undefined local variable or method `asdfasdf' for main:Object>, 2, 3, 4, 5, 6, 7, 8, 9, 10]

最后,请注意,只有 1 个线程的固定线程池将在单个线程上按顺序执行所有任务。要并行执行它们(在具有 10 个线程的池上),请将线程池初始化程序更改为pool = Concurrent::FixedThreadPool.new(10).

于 2017-02-01T18:06:44.033 回答
3

要回答您的问题 - 没有实际的方法,因为库明确地使异常静音并且没有配置。

一种可能的解决方法是手动捕获异常:

error = nil
pool = Concurrent::FixedThreadPool.new(1)
numbers.each do |number|
  pool.post do
    begin
      some_dangerous_action(number)
    rescue Exception => e
      error = e
      raise # still let the gem do its thing
    end
  end
end

pool.shutdown
pool.wait_for_termination

raise error if error
于 2017-02-07T21:01:13.230 回答