1

我正在尝试将从大文件的行中提取的信息发送到某个服务器上运行的进程。

为了加快速度,我想用一些并行线程来做到这一点。

使用concurrent.futures的 Python 2.7 backport我试过这个:

f = open("big_file")
with ThreadPoolExecutor(max_workers=4) as e:
    for line in f:
        e.submit(send_line_function, line)
f.close()

但是,这是有问题的,因为所有期货都会立即提交,因此我的机器内存不足,因为完整的文件被加载到内存中。

我的问题是,是否有一种简单的方法可以仅在有免费工人可用时才提交新的未来。

4

1 回答 1

1

您可以使用迭代文件的块

for chunk in zip(*[f]*chunksize):

(这是grouper recipe的一个应用程序,它将迭代器中的项目收集f到 size 的组中chunksize。注意:这不会立即消耗整个文件,因为zip在 Python3 中返回一个迭代器。)


import concurrent.futures as CF
import itertools as IT
import logging

logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG,
                    format='[%(asctime)s %(threadName)s] %(message)s',
                    datefmt='%H:%M:%S')

def worker(line):
    line = line.strip()
    logger.info(line)

chunksize = 1024
with CF.ThreadPoolExecutor(max_workers=4) as executor, open("big_file") as f:
    for chunk in zip(*[f]*chunksize):
        futures = [executor.submit(worker, line) for line in chunk]
        # wait for these futures to complete before processing another chunk
        CF.wait(futures)

现在,在评论中你正确地指出这不是最佳的。可能有一些工人需要很长时间,并且需要大量的工作。

通常,如果每次调用 worker 所花费的时间大致相同,那么这没什么大不了的。但是,这是一种按需推进文件句柄的方法。它使用 athreading.Condition来通知sprinkler推进文件句柄。

import logging
import threading
import Queue

logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG,
                    format='[%(asctime)s %(threadName)s] %(message)s',
                    datefmt='%H:%M:%S')
SENTINEL = object()

def worker(cond, queue):
    for line in iter(queue.get, SENTINEL):
        line = line.strip()
        logger.info(line)
        with cond:
            cond.notify()
            logger.info('notify')

def sprinkler(cond, queue, num_workers):
    with open("big_file") as f:
        for line in f:
            logger.info('advancing filehandle') 
            with cond:
                queue.put(line)
                logger.info('waiting')
                cond.wait()
        for _ in range(num_workers):
            queue.put(SENTINEL)

num_workers = 4
cond = threading.Condition()
queue = Queue.Queue()
t = threading.Thread(target=sprinkler, args=[cond, queue, num_workers])
t.start()

threads = [threading.Thread(target=worker, args=[cond, queue])]
for t in threads:
    t.start()
for t in threads:
    t.join()
于 2013-09-12T17:26:25.860 回答