4

Ruby 中关于条件变量的资源并不多,但大部分都是错误的。像ruby​​-doc教程在这里或在这里发帖- 他们都可能陷入僵局。

我们可以通过按给定顺序启动线程并可能sleep在其间放置一些线程来强制同步来解决问题。但这只是推迟了真正的问题。

我将代码重写为经典的生产者-消费者问题

require 'thread'
queue = []
mutex = Mutex.new
resource = ConditionVariable.new
threads = []

threads << Thread.new do
  5.times do |i|
    mutex.synchronize do 
      resource.wait(mutex)
      value = queue.pop
      print "consumed #{value}\n"
    end
  end
end

threads << Thread.new do
  5.times do |i|
    mutex.synchronize do
      queue << i
      print "#{i} produced\n"
      resource.signal
    end
    sleep(1) #simulate expense
  end
end

threads.each(&:join)

有时你会得到这个(但并非总是如此):

0 produced
1 produced
consumed 0
2 produced
consumed 1
3 produced
consumed 2
4 produced
consumed 3
producer-consumer.rb:30:in `join': deadlock detected (fatal)
        from producer-consumer.rb:30:in `each'
        from producer-consumer.rb:30:in `<main>'

什么是正确的解决方案?

4

4 回答 4

2

问题是,正如您之前评论的那样,这种方法只有在您可以保证消费者线程在我们的程序开始时首先获取互斥锁时才有效。如果不是这种情况,则会发生死锁,因为resource.signal您的第一个生产者线程将在消费者线程尚未等待资源时发送。结果 this firstresource.signal基本上不会做任何事情,所以你最终会遇到这样一个场景,你调用了resource.signal4 次(因为第一个丢失了),而resource.wait被调用了 5 次。这意味着消费者将永远等待,并发生死锁。

幸运的是,我们可以通过仅在没有更多即时工作可用时才允许消费者线程开始等待来解决这个问题。

require 'thread'
queue = []
mutex = Mutex.new
resource = ConditionVariable.new
threads = []

threads << Thread.new do
  5.times do |i|
    mutex.synchronize do
      if queue.empty?
        resource.wait(mutex)
      end
      value = queue.pop
      print "consumed #{value}\n"
    end
  end
end

threads << Thread.new do
  5.times do |i|
    mutex.synchronize do
      queue << i
      print "#{i} produced\n"
      resource.signal
    end
    sleep(1) #simulate expense
  end
end

threads.each(&:join)
于 2017-05-29T01:56:48.347 回答
1

这是具有多个消费者和生产者以及MonitorMixin使用的更强大的解决方案,MonitorMixin具有特殊ConditionVariable的 withwait_while()wait_until()方法

require 'monitor'

queue = []
queue.extend(MonitorMixin)
cond = queue.new_cond
consumers, producers = [], []

for i in 0..5
  consumers << Thread.start(i) do |i|
      print "consumer start #{i}\n"
      while (producers.any?(&:alive?) || !queue.empty?)
        queue.synchronize do
        cond.wait_while { queue.empty? }
        print "consumer #{i}: #{queue.shift}\n"
      end
      sleep(0.2) #simulate expense
    end
  end
end

for i in 0..3
  producers << Thread.start(i) do |i|
    id = (65+i).chr
    for j in 0..10 do
      queue.synchronize do
        item = "#{j} #{id}"
        queue << item
        print "producer #{id}: produced #{item}\n"
        j += 1
        cond.broadcast
      end
      sleep(0.1) #simulate expense
    end
  end
end

sleep 0.1 while producers.any?(&:alive?)
sleep 0.1 while consumers.any?(&:alive?)

print "queue size #{queue.size}\n"
于 2012-10-26T14:50:50.360 回答
0

基于一个论坛主题,我想出了一个可行的解决方案。它强制线程之间的交替,这并不理想。我们想要消费者和生产者的多个线程是什么?

queue = []
mutex = Mutex.new
threads = []

next_run = :producer

cond_consumer = ConditionVariable.new
cond_producer = ConditionVariable.new

threads << Thread.new do
  5.times do |i|
    mutex.synchronize do
      until next_run == :consumer
        cond_consumer.wait(mutex)
      end

      value = queue.pop
      print "consumed #{value}\n"
      next_run = :producer
      cond_producer.signal
    end
  end
end

threads << Thread.new do
  5.times do |i|
    mutex.synchronize do
      until next_run == :producer
          cond_producer.wait(mutex)
      end
      queue << i
      print "#{i} produced\n"
      next_run = :consumer
      cond_consumer.signal
    end
  end
end

threads.each(&:join)
于 2012-10-26T11:37:06.067 回答
0

您可以简化您的问题:

require 'thread'
queue = Queue.new
consumer = Thread.new { queue.pop }
consumer.join

因为您的主线程正在等待消费者线程退出,但消费者线程正在休眠(由于queue.pop),这导致:

producer-consumer.rb:4:in `join': deadlock detected (fatal)
    from producer-consumer.rb:4:in `<main>'

所以你必须等待线程完成而不调用join

require 'thread'

queue = Queue.new
threads = []

threads << Thread.new do
  5.times do |i|
    value = queue.pop
    puts "consumed #{value}"
  end
end

threads << Thread.new do
  5.times do |i|
    queue << i
    puts "#{i} produced"
    sleep(1) # simulate expense
  end
end

# wait for the threads to finish
sleep(1) while threads.any?(&:alive?)
于 2012-10-26T13:03:01.173 回答