4

我可能会问一个非常基本的问题,但我真的不知道如何在 python 中制作一个简单的并行应用程序。我在一台有 16 个内核的机器上运行我的脚本,我想有效地使用它们。我有 16 个大文件要读取,我希望每个 cpu 读取一个文件,然后合并结果。在这里,我举一个简单的例子来说明我想做的事情:

  parameter1_glob=[]
  parameter2_glob[]


  do cpu in arange(0,16):
      parameter1,parameter2=loadtxt('file'+str(cpu)+'.dat',unpack=True)

      parameter1_glob.append(parameter1)
      parameter2_glob.append(parameter2)

我认为该multiprocessing模块可能会有所帮助,但我无法理解如何将其应用于我想做的事情。

4

3 回答 3

2

我同意 Colin Dunklau 在他的评论中所说的,这个过程会成为读写这些文件的瓶颈,对 CPU 的需求是最小的。即使您有 17 个专用驱动器,您也不会最大限度地使用一个内核。此外,虽然我意识到这与您的实际问题无关,但您可能会遇到这些“巨大”文件的内存限制 - 将 16 个文件作为数组加载到内存中,然后将它们组合到另一个文件中几乎肯定会占用比你有。

查看 shell 脚本这个问题,您可能会发现更好的结果。特别是,GNUsort使用内存高效的合并排序来非常快速地对一个或多个文件进行排序 - 比除了最精心编写的 Python 或大多数其他语言的应用程序之外的所有应用程序都要快得多。

我建议避免任何形式的多线程工作,这将大大增加复杂性,而收益却很小。确保一次只在内存中保留尽可能少的文件,否则很快就会用完。无论如何,您绝对希望在两个单独的磁盘上运行读取和写入。与同时读取和写入同一个磁盘相关的减速是非常痛苦的。

于 2012-07-26T15:30:50.063 回答
1

你想逐行合并吗?有时协程对于 I/O 绑定的应用程序比经典的多任务处理更有趣。您可以为各种路由、合并和广播链接生成器和协程。David Beazley 的精彩演讲让您大开眼界。

您可以使用协程作为接收器(未经测试,请参阅 dabeaz 示例):

# A sink that just prints the lines
@coroutine
def printer():
    while True:
        line = (yield)
        print line,

sources = [
    open('file1'),
    open('file2'),
    open('file3'),
    open('file4'),
    open('file5'),
    open('file6'),
    open('file7'),
]

output = printer()
while sources:
    for source in sources:
        line = source.next()
        if not line: # EOF
            sources.remove(source)
            source.close()
            continue
        output.send(line)
于 2012-07-26T15:55:21.870 回答
0

假设每个文件的结果都很小,你可以用我的包jug做到这一点:

from jug import TaskGenerator
loadtxt = TaskGenerator(loadtxt)

parameter1_glob=[]
parameter2_glob[]

@TaskGenerator
def write_parameter(oname, ps):
    with open(oname, 'w') as output:
        for p in ps:
            print >>output, p

parameter1_glob = []
parameter2_glob = []

for cpu in arange(0,16):
    ps = loadtxt('file'+str(cpu)+'.dat',unpack=True)
    parameter1_glob.append(ps[0])
    parameter2_glob.append(ps[1])

write_parameter('output1.txt', parameter1_glob)
write_parameter('output2.txt', parameter2_glob)

现在,您可以执行多个jug execute作业。

于 2012-09-12T15:11:55.300 回答