2

我有一个包含很多行的文件(比如 10 亿行)。一个脚本正在遍历所有这些行以将它们与另一个数据集进行比较。

由于目前它在 1 个线程/1 个核心上运行,我想知道是否可以启动多个分叉,每个分叉同时处理文件的一部分。

到目前为止,我想到的唯一解决方案是sedunix 命令。使用 sed 可以读取文件的“切片”(第 x 行到第 y 行)。因此,几个 fork 可以处理相应 seds 的输出。然而问题是 Ruby 会首先将整个 sed 输出加载到 RAM 中。

有没有比 sed 更好的解决方案,或者有没有办法将 sed 输出“流式传输”到 Ruby 中?

4

2 回答 2

2

您可以使用fork或来执行此操作threads。在这两种情况下,您都必须编写一些东西来管理它们,并确定需要多少子进程,以及每个文件应该处理多少行。

因此,对于第一段代码,您需要扫描文件并确定它包含多少行。如果您在 *nix 或 Mac OS 上,您可以使用以下命令执行此操作:

lc = `wc -l path/to/file`.to_i

或者通过简单地打开文件并在读取行时增加一个计数器。Ruby 在这方面非常快,但在包含“60 亿”行的文件上,wc可能是更好的选择:

lc = 0
File.foreach('path/to/file') { lc += 1 }

将其除以您要管理的子流程的数量:

NUM_OF_PROCESSES = 5
lines_per_process = lc/NUM_OF_PROCESSES

然后启动您的流程,告诉他们从哪里开始处理,以及多少行:

require 'threads'
children = []
1.step(lc, lines_per_process) do |start_line|
  children << Thread.new do
    cur_line = 0
    File.foreach('path/to/file') do |li|
      cur_line += 1
      next unless (cur_line === start_line .. (start_line + lines_per_process)
      # ... do something with the lines read
    end
  end
end

# wait for them to finish
children.each { |c| c.join }

这是未经测试的,但这是我要开始的地方。

于 2013-05-09T05:10:40.477 回答
2

你所要求的实际上不会帮助你。

首先,要跳转到文件的第 n 行,您首先必须读取文件的前一部分,以计算其中的换行数。例如:

$ ruby -e '(1..10000000).each { |i| puts "This is line number #{i}"}' > large_file.txt
$ du -h large_file.txt
 266M   large_file.txt
$ purge # mac os x command - clears any in memory disk caches in use
$ time sed -n -e "5000000p; 5000000q" large_file.txt
This is line number 5000000
sed -n -e "5000000p; 5000000q" large_file.txt  0.52s user 0.13s system 28% cpu 2.305 total
$ time sed -n -e "5000000p; 5000000q" large_file.txt
This is line number 5000000
sed -n -e "5000000p; 5000000q" large_file.txt  0.49s user 0.05s system 99% cpu 0.542 total

请注意该sed命令不是即时的,它必须通读文件的初始部分才能确定第 5 百万行的位置。这就是为什么第二次运行它对我来说要快得多 - 我的计算机将文件缓存到内存中。

即使您确实完成了此操作(通过手动拆分文件),如果您不断地在一个或多个文件的不同部分之间跳转以读取下一行,您的 IO 性能也会很差。


更好的是在单独的线程(或进程)上处理每第 n 行。这将允许使用多个 cpu 内核,但仍然具有良好的 IO 性能。这可以通过并行库轻松完成。

使用示例(我的电脑有 4 个核心):

$ ruby -e '(1..10000000).each { |i| puts "This is line number #{i}"}' > large_file.txt # use a smaller file to speed up the tests
$ time ruby -r parallel -e "Parallel.each(File.open('large_file.txt').each_line, in_processes: 4) { |line| puts line if (line * 10000) =~ /9999/ }"
This is line number 9999
This is line number 19999
This is line number 29999
This is line number 39999
This is line number 49999
This is line number 59999
This is line number 69999
This is line number 79999
This is line number 89999
This is line number 99990
This is line number 99991
This is line number 99992
This is line number 99993
This is line number 99994
This is line number 99995
This is line number 99996
This is line number 99997
This is line number 99999
This is line number 99998
ruby -r parallel -e   55.84s user 10.73s system 400% cpu 16.613 total

$ time ruby -r parallel -e "Parallel.each(File.open('large_file.txt').each_line, in_processes: 1) { |line| puts line if (line * 10000) =~ /9999/ }"
This is line number 9999
This is line number 19999
This is line number 29999
This is line number 39999
This is line number 49999
This is line number 59999
This is line number 69999
This is line number 79999
This is line number 89999
This is line number 99990
This is line number 99991
This is line number 99992
This is line number 99993
This is line number 99994
This is line number 99995
This is line number 99996
This is line number 99997
This is line number 99998
This is line number 99999
ruby -r parallel -e   47.04s user 7.46s system 97% cpu 55.738 total

第二个版本(使用 4 个进程)完成了原来的 29.81% 的时间,快了近 4 倍。

于 2013-05-09T05:25:43.143 回答