您可以使用迭代文件的块
for chunk in zip(*[f]*chunksize):
(这是grouper recipe的一个应用程序,它将迭代器中的项目收集f
到 size 的组中chunksize
。注意:这不会立即消耗整个文件,因为zip
在 Python3 中返回一个迭代器。)
import concurrent.futures as CF
import itertools as IT
import logging
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG,
format='[%(asctime)s %(threadName)s] %(message)s',
datefmt='%H:%M:%S')
def worker(line):
line = line.strip()
logger.info(line)
chunksize = 1024
with CF.ThreadPoolExecutor(max_workers=4) as executor, open("big_file") as f:
for chunk in zip(*[f]*chunksize):
futures = [executor.submit(worker, line) for line in chunk]
# wait for these futures to complete before processing another chunk
CF.wait(futures)
现在,在评论中你正确地指出这不是最佳的。可能有一些工人需要很长时间,并且需要大量的工作。
通常,如果每次调用 worker 所花费的时间大致相同,那么这没什么大不了的。但是,这是一种按需推进文件句柄的方法。它使用 athreading.Condition
来通知sprinkler
推进文件句柄。
import logging
import threading
import Queue
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG,
format='[%(asctime)s %(threadName)s] %(message)s',
datefmt='%H:%M:%S')
SENTINEL = object()
def worker(cond, queue):
for line in iter(queue.get, SENTINEL):
line = line.strip()
logger.info(line)
with cond:
cond.notify()
logger.info('notify')
def sprinkler(cond, queue, num_workers):
with open("big_file") as f:
for line in f:
logger.info('advancing filehandle')
with cond:
queue.put(line)
logger.info('waiting')
cond.wait()
for _ in range(num_workers):
queue.put(SENTINEL)
num_workers = 4
cond = threading.Condition()
queue = Queue.Queue()
t = threading.Thread(target=sprinkler, args=[cond, queue, num_workers])
t.start()
threads = [threading.Thread(target=worker, args=[cond, queue])]
for t in threads:
t.start()
for t in threads:
t.join()