2

我想跳过从map_async. 它们在内存中增长,但我不需要它们。

这是一些代码:

def processLine(line):
    #process something
    print "result"
pool = Pool(processes = 8)
for line in sys.stdin:
    lines.append(line)
    if len(lines) >= 100000:
        pool.map_async(processLine, lines, 2000)
pool.close()
pool.join()

当我必须处理具有数亿行的文件时,python 进程在内存中增长到几 GB。我该如何解决?

谢谢你的帮助 :)

4

2 回答 2

3

您的代码有一个错误:

for line in sys.stdin:
    lines.append(line)
    if len(lines) >= 100000:
        pool.map_async(processLine, lines, 2000)

这要等到lines累积超过100000行。之后,在每条附加行pool.map_async的 100000+ 行的整个列表中被调用。

目前尚不清楚您真正想要做什么,但如果您不想要返回值,请使用pool.apply_async,而不是pool.map_async。也许是这样的:

import multiprocessing as mp

def processLine(line):
    #process something
    print "result"

if __name__ == '__main__':
    pool = mp.Pool(processes = 8)
    for line in sys.stdin:
        pool.apply_async(processLine, args = (line, ))
    pool.close()
    pool.join()
于 2012-12-17T15:17:50.710 回答
0

你是对的。有一些错误

我是说:

def processLine(line):
  #process something
  print "result"
  pool = Pool(processes = 8)

if __name__ == '__main__':
  for line in sys.stdin:
    lines.append(line)
    if len(lines) >= 100000:
      pool.map_async(processLine, lines, 2000)
      lines = [] #to clear buffer
  pool.map_async(processLine, lines, 2000)
  pool.close()
  pool.join()

我使用 map_async 是因为它具有可配置的 chunk_size ,因此如果有很多行处理时间很短,它会更有效。

于 2012-12-17T22:50:45.543 回答