1

我有一个执行广度优先搜索资源的算法:

def crawl(starting_node)
  items=[starting_node]
  until items.empty?
    item = items.shift
    kids = item.slow_network_action # takes seconds
    kids.each{ |kid| items << kid }
  end
end

我想使用一些并发线程来并行化 slow_network_action。
这样做的合理方法是什么?

这是一种有效的技术,但我觉得肯定不是正确的方法:

def crawl(starting_node)
  mutex = Mutex.new
  items = [starting_node]
  4.times.map{
    loop do
      unless item=mutex.synchronize{ items.shift }
        sleep LONGER_THAN_LONGEST_NETWORK_ACTION
        break unless item=mutex.synchronize{ items.shift }
      end
      kids = item.slow_network_action
      mutex.synchronize{
        kids.each{ |kid| items << kid }
      }
    end
  }.each(&:join)
end

我想做一些事情,比如让线程在等待将项目添加到队列时实际休眠,在添加项目时唤醒,并在每个人都在等待时让所有线程退出,而没有添加任何线程。


这个替代代码几乎可以工作,但是对于可能(并且确实)发生的死锁,以及完全缺乏适当的退出策略:

require 'thread'
def crawl(starting_node)
  items = Queue.new
  items << starting_node
  4.times.map{
    while item=items.shift
      kids = item.slow_network_action
      kids.each{ |kid| items << kid }
    end
  }.each(&:join)
end
4

1 回答 1

2

这应该为您指明正确的方向:

require 'monitor'

NUM_THREADS = 4

def crawl(starting_node)
  items = [starting_node]
  items.extend MonitorMixin
  item_cond = items.new_cond

  threads = []
  working_threads = 0
  finished = false

  NUM_THREADS.times do
    items.synchronize do
      working_threads += 1
    end
    threads << Thread.new do
      item = nil
      kids = []
      loop do
        items.synchronize do

          #add any new items to array
          items.concat kids

          if (items.empty? && working_threads == 1)
            #all other threads are waiting, and there's no more items
            #to process, so we must be done
            finished = true
          end

          #wake up all waiting threads, either to finish or do more work
          #watch out for thundering herds
          item_cond.broadcast unless (items.empty? && !finished)

          #wait, but first decrement count of working threads
          #so we can determine when to finish
          working_threads -= 1
          item_cond.wait_while { items.empty? && !finished}
          Thread.exit if finished
          working_threads += 1

          #get next item
          item = items.shift
        end

        kids = item.slow_network_action
      end

    end
  end

  threads.each(&:join)
end

这使items阵列成为监视器,并通过它进行任何同步,以及ConditionVariable从监视器创建的关联。

这类似于 aQueue内部的工作方式,除了它还会检查所有工作何时完成(这实际上增加了一些复杂性)。

线程主循环以一个空kids数组开始,该数组被添加到其中items以避免在循环中需要两个单独的同步块,以及随之而来的竞争条件。

请注意,这broadcast会导致所有等待线程唤醒,并可能导致雷鸣般的羊群。我不认为这应该在这里造成任何问题。另一种方法是一次添加一个元素kids,并调用signal每个元素。当所有工作都完成时,这会增加处理案件的复杂性。

于 2012-04-27T21:35:07.630 回答