2

我有一个从多处理池写入的 gzip 文件句柄。不幸的是,输出文件似乎在某个时间点后损坏了,因此执行以下操作zcat out | wc

gzip: out: invalid compressed data--format violated

我正在通过不使用 gzip 来解决这个问题。但我很好奇为什么会发生这种情况以及是否有任何解决方案。

不确定这是否重要,但我在我无法控制的远程 linux 机器上运行代码,但我的猜测是它是一台 ubuntu 机器。Python 2.7.3

这是稍微简化的代码:

lock = Lock()
ohandle = gzip.open("out", "w")
def process(fn):
  rv = []
  for l in open(fn):
    sometext = dosomething(l)
    rv.append(sometext)


  lock.acquire()
  for sometext in rv:
    print >> ohandle, sometext
  lock.release()

pool = Pool(processes=4)
pm = pool.map(process, some_file_list])
ohandle.close()
4

1 回答 1

0

请参阅http://docs.python.org/2/library/multiprocessing.html#programming-guidelines

  • 你应该用if __name__ == '__main__'. 或者该部分将由子进程运行。
  • 将资源显式传递给子进程。( ohandle, lock)

我将您的代码修改为不使用 lock 并且不共享ohandle。相反,我使用了临时文件。( fn + '.temp')

注意:您应该检查文件名。如果有任何带有“.temp”后缀的文件,我的代码可能会删除您的数据。


import os


def process(fn):
    out_fn = fn + '.temp'
    with open(fn) as f, open(out_fn, 'w') as f2:
        for l in f:
            sometext = dosomething(l)
            print >> f2, sometext
    return out_fn

if __name__ == '__main__':
    some_file_list = ...
    pool = Pool(processes=4)

    ohandle = gzip.open('out.gz', 'w')
    for fn in pool.map(process, some_file_list):
        with open(fn) as f:
            while True:
                data = f.read(1<<12)
                if not data: break
                ohandle.write(data)
        os.unlink(fn)
    pool.close()
    pool.join()
于 2013-06-10T03:17:22.797 回答