0

在一个 ruby​​ 应用程序中,我有一堆不共享状态的任务,我想一次启动多个任务。至关重要的是,我不关心它们的启动顺序,也不关心它们的返回值(因为它们在完成之前都会引发数据库事务)。我知道根据我的 ruby​​ 实现,GIL 可能会阻止这些任务实际同时运行,但这没关系,因为我实际上对真正的并发性并不感兴趣:这些工作线程无论如何都会通过网络请求进行 IO 绑定。

到目前为止,我得到的是:

def asyncDispatcher(numConcurrent, stateQueue, &workerBlock)
  workerThreads = []

  while not stateQueue.empty?
    while workerThreads.length < numConcurrent
      nextState = stateQueue.pop

      nextWorker =
        Thread.new(nextState) do |st|
          workerBlock.call(st)
        end

      workerThreads.push(nextWorker)
    end # inner while

    workerThreads.delete_if{|th| not th.alive?} # clean up dead threads
  end # outer while

  workerThreads.each{|th| th.join} # join any remaining workers
end # asyncDispatcher

我这样调用它:

asyncDispatcher(2, (1..10).to_a ) {|x| x + 1}

这里是否有任何潜在的错误或并发陷阱?或者也许运行时中的某些东西可以简化这项任务?

4

1 回答 1

2

使用队列:

require 'thread'

def asyncDispatcher(numWorkers, stateArray, &processor)
  q = Queue.new
  threads = []

  (1..numWorkers).each do |worker_id|
    threads << Thread.new(processor, worker_id) do |processor, worker_id|
      while true
        next_state = q.shift      #shift() blocks if q is empty, which is the case now
        break if next_state == q  #Some sentinel that won't appear in your data
        processor.call(next_state, worker_id)
      end
    end
  end

  stateArray.each {|state| q.push state}
  stateArray.each {q.push q}     #Some sentinel that won't appear in your data

  threads.each(&:join)
end


asyncDispatcher(2, (1..10).to_a) do |state, worker_id|
  time = sleep(Random.rand 10)  #How long it took to process state
  puts "#{state} is finished being processed: worker ##{worker_id} took #{time} secs."
end

--output:--
2 is finished being processed: worker #1 took 4 secs.
3 is finished being processed: worker #1 took 1 secs.
1 is finished being processed: worker #2 took 7 secs.
5 is finished being processed: worker #2 took 1 secs.
6 is finished being processed: worker #2 took 4 secs.
7 is finished being processed: worker #2 took 1 secs.
4 is finished being processed: worker #1 took 8 secs.
8 is finished being processed: worker #2 took 1 secs.
10 is finished being processed: worker #2 took 3 secs.
9 is finished being processed: worker #1 took 9 secs.

好的,好的,有人会看着那个输出并大声喊叫,

嘿,#2 总共花了 13 秒连续完成四项工作,而#1 只用了 8 秒。对于一份工作,所以#1 的输出为 8 秒。工作应该早点来。Ruby 中没有线程切换!红宝石坏了!”。

好吧,当#1 为它的前两个作业睡眠总共 5 秒时,#2 同时在睡觉,所以当#1 完成它的前两个作业时,#2 只剩下 2 秒的睡眠时间。因此,将 #2 的 7 秒替换为 2 秒,您会看到在 #1 完成前两个作业后,#2 连续运行四个作业总共花费了 8 秒,这与 #1 并列8 第二份工作。

于 2013-07-06T12:09:25.113 回答