我找不到适合 Ruby 的 ThreadPool 实现,所以我写了我的(部分基于这里的代码:http://web.archive.org/web/20081204101031/http://snippets.dzone.com: 80/ posts/show/3276 ,但更改为等待/信号和其他实现 ThreadPool 关闭。但是在运行一段时间后(有 100 个线程并处理大约 1300 个任务),它在第 25 行因死锁而死 - 它等待新工作有任何想法,为什么会发生?
require 'thread'
begin
  require 'fastthread'
rescue LoadError
  $stderr.puts "Using the ruby-core thread implementation"
end 
class ThreadPool
  class Worker
    def initialize(callback)
      @mutex = Mutex.new
      @cv = ConditionVariable.new
      @callback = callback
      @mutex.synchronize {@running = true}
      @thread = Thread.new do
        while @mutex.synchronize {@running}
          block = get_block
          if block
            block.call
            reset_block
            # Signal the ThreadPool that this worker is ready for another job
            @callback.signal
          else
            # Wait for a new job
            @mutex.synchronize {@cv.wait(@mutex)} # <=== Is this line 25?
          end
        end
      end
    end
    def name
      @thread.inspect
    end
    def get_block
      @mutex.synchronize {@block}
    end
    def set_block(block)
      @mutex.synchronize do
        raise RuntimeError, "Thread already busy." if @block
        @block = block
        # Signal the thread in this class, that there's a job to be done
        @cv.signal
      end
    end
    def reset_block
      @mutex.synchronize {@block = nil}
    end
    def busy?
      @mutex.synchronize {!@block.nil?}
    end
    def stop
      @mutex.synchronize {@running = false}
      # Signal the thread not to wait for a new job
      @cv.signal
      @thread.join
    end
  end
  attr_accessor :max_size
  def initialize(max_size = 10)
    @max_size = max_size
    @workers = []
    @mutex = Mutex.new
    @cv = ConditionVariable.new
  end
  def size
    @mutex.synchronize {@workers.size}
  end
  def busy?
    @mutex.synchronize {@workers.any? {|w| w.busy?}}
  end
  def shutdown
    @mutex.synchronize {@workers.each {|w| w.stop}}
  end
  alias :join :shutdown
  def process(block=nil,&blk)
    block = blk if block_given?
    while true
      @mutex.synchronize do
         worker = get_worker 
         if worker
           return worker.set_block(block)
         else
           # Wait for a free worker
           @cv.wait(@mutex)
         end
      end
    end
  end
  # Used by workers to report ready status
  def signal
    @cv.signal
  end
  private
  def get_worker
    free_worker || create_worker
  end
  def free_worker
    @workers.each {|w| return w unless w.busy?}; nil
  end
  def create_worker
    return nil if @workers.size >= @max_size
    worker = Worker.new(self)
    @workers << worker
    worker
  end
end