1

我正在尝试按照以下示例在 ruby​​ 中进行多线程循环:http: //taw.blogspot.com/2010/05/very-simple-parallelization-with-ruby.html

我复制了那个编码并写了这个:

module Enumerable

    def ignore_exception
        begin
            yield
        rescue Exception => e
            STDERR.puts e.message
        end
    end
    def in_parallel(n)
        t_queue = Queue.new
        threads = (1..n).map {
            Thread.new{ 
                while x = t_queue.deq
                    ignore_exception{ yield(x[0]) }
                end
            }
        }
        each{|x| t_queue << [x]}
        n.times{ t_queue << nil }
        threads.each{|t| 
            t.join 
            unless t[:out].nil?
                puts t[:out]
            end
        }
    end
end

ids.in_parallel(10){ |id|
 conn = open_conn(loc)
 out = conn.getData(id)
 Thread.current[:out] = out
}

我理解它的方式是一次将 10 个项目出队,处理每个 id 循环中的块并在最后加入 10 个线程,并重复直到完成。运行此代码后,有时我会得到不同的结果,特别是如果我的 id 的大小小于 10,我很困惑为什么会发生这种情况。即使我可以在服务器端检查这些 id 的输出是否存在,它也有一半的时间不会为多达一半的 id 输出任何内容。例如,如果正确的输出是 "Got id 1" 和 "Got id 2",它只会打印出 {"Got id 1"} 或 {"Got id 2"} 或 {"Got id 1", "Got id 2"}。我的问题是我对这段代码的理解是否正确?

4

1 回答 1

0

我的代码中的问题是open_conn()函数调用,它不是线程安全的。我通过同步获取连接句柄解决了这个问题:

connLock = Mutex.new
ids.in_parallel(10){ |id|
 conn = nil
 connLock.synchronize {
    conn = open_conn(loc)
 }
 out = conn.getData(id)
 Thread.current[:out] = out
}

还应该使用http://peach.rubyforge.org/ 进行循环并行化,方法是:

 ids.peach(10){ |id| ... }
于 2013-09-18T17:58:22.443 回答