0

我知道有像 Parallel 这样的伟大宝石,但我想出了下面的课程作为练习。

它工作正常,但是当进行大量迭代时,有时 Ruby 会“卡住”。当按下 CTRL+CI 时,可以从回溯中看到它总是在第 38 行或第 45 行(两条 Marshal 行)。你能看出这里有什么问题吗?管道似乎“挂起”,所以我想我可能以错误的方式使用它们。

我的目标是使用有限数量的分叉 (max_forks) 遍历一个数组(我将其作为“对象”传递)并返回一些值。此外,我想保证当父母被杀时所有的孩子都会被杀死(即使是在 kill -9 的情况下),这就是我引入“life_line”管道的原因(我在 Stackoverflow 上读到这可能会奏效) .

class Parallel

  def self.do_fork(max_forks, objects)
    waiter_threads = []
    fork_counter = []

    life_line = {}
    comm_line = {}

    objects.each do |object|
      key = rand(24 ** 24).to_s(36)

      sleep(0.01) while fork_counter.size >= max_forks

      if fork_counter.size < max_forks
        fork_counter << true

        life_line[key] = {}
        life_line[key][:r], life_line[key][:w] = IO.pipe

        comm_line[key] = {}
        comm_line[key][:r], comm_line[key][:w] = IO.pipe

        pid = fork {
          life_line[key][:w].close
          comm_line[key][:r].close

          Thread.new {
            begin
              life_line[key][:r].read
            rescue SignalException, SystemExit => e
              raise e
            rescue Exception => e
              Kernel.exit
            end
          }

          Marshal.dump(yield(object), comm_line[key][:w]) # return yield
        }

        waiter_threads << Thread.new {
          Process.wait(pid)

          comm_line[key][:w].close
          reply = Marshal.load(comm_line[key][:r])
          # process reply here
          comm_line[key][:r].close

          life_line[key][:r].close
          life_line[key][:w].close
          life_line[key] = nil

          fork_counter.pop 
        }
      end
    end

    waiter_threads.each { |k| k.join } # wait for all threads to finish
  end
end
4

2 回答 2

1

错误是这样的:

管道只能处理一定数量的数据(例如 64 KB)。一旦你写得更多,管道将永远“卡住”。

一个简单的解决方案是在开始写入之前读取线程中的管道。

comm_line = IO.pipe

# Buffered Pipe Reading (in case bigger than 64 KB)
reply = ""
read_buffer = Thread.new {
  while !comm_line[0].eof?
    reply = Marshal.load(comm_line[0])
  end
}

child_pid = fork {
  comm_line[0].close
  comm_line[0].write "HUGE DATA LARGER THAN 64 KB"
}

Process.wait(child_pid)

comm_line[1].close
read_buffer.join
comm_line[0].close

puts reply # outputs the "HUGE DATA"
于 2013-06-17T10:10:07.427 回答
0

我不认为马歇尔有问题。更明显的一个似乎是您的 fork 可能在等待者线程到达它之前完成执行(导致后者永远等待)。

尝试更改Process.wait(pid)Process.wait(pid, Process::WNOHANG). Process::WNOHANG如果没有可用的子级(匹配给定的 PID,如果有的话),该标志指示 Ruby 不挂起。请注意,这可能并非在所有平台上都可用,但至少应该在 Linux 上运行。

您的代码还有许多其他潜在问题,但如果您只是“作为练习”提出它,它们可能无关紧要。例如,Marshal.load 不喜欢遇到 EOF,因此如果您希望读取多个对象,我可能会通过说类似Marshal.load(comm_line[key][:r]) unless comm_line[key][:r].eof?或循环来防止这些。until comm_line[key][:r].eof?

于 2013-06-11T12:10:03.167 回答