我对python相当陌生。我正在使用多处理模块读取标准输入上的文本行,以某种方式转换它们并将它们写入数据库。这是我的代码片段:
batch = []
pool = multiprocessing.Pool(20)
i = 0
for i, content in enumerate(sys.stdin):
batch.append(content)
if len(batch) >= 10000:
pool.apply_async(insert, args=(batch,i+1))
batch = []
pool.apply_async(insert, args=(batch,i))
pool.close()
pool.join()
现在一切正常,直到我开始处理巨大的输入文件(数亿行),然后通过管道传输到我的 python 程序中。在某些时候,当我的数据库变慢时,我看到内存已满。
玩了一会儿,发现 pool.apply_async 和 pool.map_async 从来没有阻塞过,所以要处理的调用队列越来越大。
解决我的问题的正确方法是什么?我希望我可以设置一个参数,一旦达到某个队列长度,它将阻止 pool.apply_async 调用。Java 中的 AFAIR 可以为此目的给 ThreadPoolExecutor 一个具有固定长度的 BlockingQueue。
谢谢!