4

我在 ruby​​ 中编写了一些代码来通过线程池处理数组中的项目。在这个过程中,我预先分配了一个与传入数组大小相同的结果数组。在线程池中,我在预分配数组中分配项目,但这些项目的索引保证是唯一的。考虑到这一点,我是否需要在作业周围加上Mutex#synchronize?

例子:

SIZE = 1000000000
def collect_via_threadpool(items, pool_count = 10)
  processed_items = Array.new(items.count, nil)
  index = -1
  length = items.length
  mutex = Mutex.new
  items_mutex = Mutex.new
  [pool_count, length, 50].min.times.collect do
    Thread.start do
        while (i = mutex.synchronize{index = index + 1}) < length do


          processed_items[i] = yield(items[i])
          # ^ do I need to synchronize around this? `processed_items` is preallocated

        end
    end
  end.each(&:join)
  processed_items
end

items = collect_via_threadpool(SIZE.times.to_a, 100) do |item|
  item.to_s
end

raise unless items.size == SIZE

items.each_with_index do |item, index|
  raise unless item.to_i == index
end

puts 'success'

(此测试代码需要很长时间才能运行,但似乎每次都打印“成功”。)

为了安全起见,我似乎想将其包围Array#[]=起来Mutex#synchronize,但我的问题是:

在 Ruby 的规范中,这段代码是否被定义为安全的?

4

1 回答 1

1

Ruby 中没有任何东西被指定为线程安全的Mutex(因此任何从它派生的东西)。如果您想知道您的特定代码是否是线程安全的,您需要查看您的实现如何处理线程和数组。

对于 MRI,调用Array.new(n, nil)实际上会为整个数组分配内存,因此如果您的线程保证不共享索引,您的代码将正常工作。这就像让多个线程在没有互斥锁的情况下对不同的变量进行操作一样安全。

但是对于其他实现,Array.new(n, nil)可能不会分配整个数组,并且稍后分配给索引可能涉及重新分配和内存副本,这可能会造成灾难性的破坏。

因此,尽管您的代码可能有效(至少在 MRI 中),但不要依赖它。当我们讨论这个话题时,甚至没有指定 Ruby 的线程实际上是并行运行的。因此,如果您试图避免互斥锁,因为您认为您可能会看到一些性能提升,也许您应该重新考虑您的方法。

于 2014-10-28T16:01:47.020 回答