1

我有一组工人MessageConsumer,每个人都有不同的职责:HTTP 调用、CRUD Mongo/Redis、API 调用等。

它们具有相同的结构:

class MessageConsumer
  include Celluloid

  def perform(sqs_message)
    # Do something
  end
end

[worker-name].rb为每个工人都有一个文件,其中包括:

Celluloid::Actor[:pool] = MessageConsumer.pool

while @still_running
  sqs_message =  @queue.receive_message(start_options)

  if sqs_message
    Celluloid::Actor[:pool].async.perform(sqs_message)
  else
    # sleep for a while as there's nothing in the queue.
    sleep rand(2..6)
  end
end

@queue.receive_message接收来自 Amazon SQS 的消息并调用传递消息的工作人员。

在每台服务器中,我们都有一组[worker-name].rb运行:

pgrep -fl ruby

14885 ruby bin/worker_http # two processes
15890 ruby bin/worker_http # ^^^
17956 ruby bin/worker_api
19734 ruby bin/worker_mongo
22637 ruby bin/worker_redis

问题:我"No live threads left. Deadlock?"在运行进程一段时间后变得频繁(在让线程忙碌之后)。

ruby 2.0.0p451 (2014-02-24 revision 45167) [x86_64-linux]在服务器中使用,不确定是否与 MRI 有关,也许我需要切换到 JRuby。但有趣的是,我认为这个问题并不常见,所以我认为这可能是我的实现问题。

有任何想法吗?

4

1 回答 1

0

似乎添加限制加入可以解决这个问题:

     [1] pry(main)> task = Thread.new { Thread.stop }
     => #<Thread:0x000055918c669018@(pry):1 sleep_forever>
     [2] pry(main)> task.join
     fatal: No live threads left. Deadlock?
     2 threads, 2 sleeps current:0x000055918bc08ef0 main thread:0x000055918bc08ef0
     * #<Thread:0x000055918bc3cbf8 sleep_forever>
        rb_thread_t:0x000055918bc08ef0 native:0x00007efd46186080 int:0
        (pry):2:in `join'
        (pry):2:in `__pry__'
        /home/mifrill/.rbenv/versions/2.5.1/lib/ruby/gems/2.5.0/gems/pry-0.11.3/lib/pry/pry_instance.rb:355:in `eval'
        /home/mifrill/.rbenv/versions/2.5.1/lib/ruby/gems/2.5.0/gems/pry-0.11.3/lib/pry/pry_instance.rb:355:in `evaluate_ruby'
        /home/mifrill/.rbenv/versions/2.5.1/lib/ruby/gems/2.5.0/gems/pry-0.11.3/lib/pry/pry_instance.rb:323:in `handle_line'
        /home/mifrill/.rbenv/versions/2.5.1/lib/ruby/gems/2.5.0/gems/pry-0.11.3/lib/pry/pry_instance.rb:243:in `block (2 levels) in eval'
        /home/mifrill/.rbenv/versions/2.5.1/lib/ruby/gems/2.5.0/gems/pry-0.11.3/lib/pry/pry_instance.rb:242:in `catch'
        /home/mifrill/.rbenv/versions/2.5.1/lib/ruby/gems/2.5.0/gems/pry-0.11.3/lib/pry/pry_instance.rb:242:in `block in eval'
        /home/mifrill/.rbenv/versions/2.5.1/lib/ruby/gems/2.5.0/gems/pry-0.11.3/lib/pry/pry_instance.rb:241:in `catch'
        /home/mifrill/.rbenv/versions/2.5.1/lib/ruby/gems/2.5.0/gems/pry-0.11.3/lib/pry/pry_instance.rb:241:in `eval'
        /home/mifrill/.rbenv/versions/2.5.1/lib/ruby/gems/2.5.0/gems/pry-0.11.3/lib/pry/repl.rb:77:in `block in repl'
        /home/mifrill/.rbenv/versions/2.5.1/lib/ruby/gems/2.5.0/gems/pry-0.11.3/lib/pry/repl.rb:67:in `loop'
        /home/mifrill/.rbenv/versions/2.5.1/lib/ruby/gems/2.5.0/gems/pry-0.11.3/lib/pry/repl.rb:67:in `repl'
        /home/mifrill/.rbenv/versions/2.5.1/lib/ruby/gems/2.5.0/gems/pry-0.11.3/lib/pry/repl.rb:38:in `block in start'
        /home/mifrill/.rbenv/versions/2.5.1/lib/ruby/gems/2.5.0/gems/pry-0.11.3/lib/pry/input_lock.rb:61:in `__with_ownership'
        /home/mifrill/.rbenv/versions/2.5.1/lib/ruby/gems/2.5.0/gems/pry-0.11.3/lib/pry/input_lock.rb:79:in `with_ownership'
        /home/mifrill/.rbenv/versions/2.5.1/lib/ruby/gems/2.5.0/gems/pry-0.11.3/lib/pry/repl.rb:38:in `start'
        /home/mifrill/.rbenv/versions/2.5.1/lib/ruby/gems/2.5.0/gems/pry-0.11.3/lib/pry/repl.rb:13:in `start'
        /home/mifrill/.rbenv/versions/2.5.1/lib/ruby/gems/2.5.0/gems/pry-0.11.3/lib/pry/pry_class.rb:192:in `start'
        /home/mifrill/.rbenv/versions/2.5.1/lib/ruby/gems/2.5.0/gems/pry-0.11.3/lib/pry/cli.rb:116:in `start'
        /home/mifrill/.rbenv/versions/2.5.1/lib/ruby/gems/2.5.0/gems/pry-0.11.3/bin/pry:12:in `<top (required)>'
        /home/mifrill/.rbenv/versions/2.5.1/bin/pry:23:in `load'
        /home/mifrill/.rbenv/versions/2.5.1/bin/pry:23:in `<main>'
     * #<Thread:0x000055918c669018@(pry):1 sleep_forever>
        rb_thread_t:0x000055918c742dd0 native:0x00007efd4377e700 int:0
         depended by: tb_thread_id:0x000055918bc08ef0
        (pry):1:in `stop'
        (pry):1:in `block in __pry__'
     from (pry):2:in `join'
     [3] pry(main)> task.join 1
     => nil

尝试在 lib/celluoid/actor.rb 中增加它:

  def join(actor, timeout = nil)
    actor.thread.join(timeout)
    actor
  end
于 2018-10-17T15:48:39.733 回答