假设我有N个生成器,它们产生一个 items 流gs = [..] # list of generators。
我可以很容易地将它们放在一起,从:zip中的每个生成器中获取一个元组生成器。gstuple_gen = zip(*gs)
这会依次调用next(g)每个,并将结果收集到一个元组中。但是,如果每个项目的生产成本都很高,我们可能希望在多个线程上并行工作。ggsnext(g)
我怎样才能实现pzip(..)这样做?
假设我有N个生成器,它们产生一个 items 流gs = [..] # list of generators。
我可以很容易地将它们放在一起,从:zip中的每个生成器中获取一个元组生成器。gstuple_gen = zip(*gs)
这会依次调用next(g)每个,并将结果收集到一个元组中。但是,如果每个项目的生产成本都很高,我们可能希望在多个线程上并行工作。ggsnext(g)
我怎样才能实现pzip(..)这样做?
您所要求的可以通过创建一个生成器来实现,该生成器从apply_asyncThreadPool 上的 -calls 中产生结果。
仅供参考,我使用指定参数pandas.read_csv获得的 -iterators对这种方法进行了基准测试。chunksize我创建了 1M 行大小的 csv 文件的 8 个副本,并指定了 chunksize=100_000。
其中四个文件是使用您提供的顺序方法读取的,四个使用mt_gen下面的函数,使用四个线程池:
- 单线程 ~ 3.68 s
- 多线程 ~ 1.21 秒
但这并不意味着它会改善每个硬件和数据设置的结果。
import time
import threading
from multiprocessing.dummy import Pool # dummy uses threads
def _load_sim(x = 10e6):
for _ in range(int(x)):
x -= 1
time.sleep(1)
def gen(start, stop):
for i in range(start, stop):
_load_sim()
print(f'{threading.current_thread().name} yielding {i}')
yield i
def multi_threaded(gens):
combi_g = mt_gen(gens)
for item in combi_g:
print(item)
def mt_gen(gens):
with Pool(N_WORKERS) as pool:
while True:
async_results = [pool.apply_async(next, args=(g,)) for g in gens]
try:
results = [r.get() for r in async_results]
except StopIteration: # needed for Python 3.7+, PEP 479, bpo-32670
return
yield results
if __name__ == '__main__':
N_GENS = 10
N_WORKERS = 4
GEN_LENGTH = 3
gens = [gen(x * GEN_LENGTH, (x + 1) * GEN_LENGTH) for x in range(N_GENS)]
multi_threaded(gens)
输出:
Thread-1 yielding 0
Thread-2 yielding 3
Thread-4 yielding 6
Thread-3 yielding 9
Thread-1 yielding 12
Thread-2 yielding 15
Thread-4 yielding 18
Thread-3 yielding 21
Thread-1 yielding 24
Thread-2 yielding 27
[0, 3, 6, 9, 12, 15, 18, 21, 24, 27]
Thread-3 yielding 7
Thread-1 yielding 10
Thread-2 yielding 4
Thread-4 yielding 1
Thread-3 yielding 13
Thread-1 yielding 16
Thread-4 yielding 22
Thread-2 yielding 19
Thread-3 yielding 25
Thread-1 yielding 28
[1, 4, 7, 10, 13, 16, 19, 22, 25, 28]
Thread-1 yielding 8
Thread-4 yielding 2
Thread-3 yielding 11
Thread-2 yielding 5
Thread-1 yielding 14
Thread-4 yielding 17
Thread-3 yielding 20
Thread-2 yielding 23
Thread-1 yielding 26
Thread-4 yielding 29
[2, 5, 8, 11, 14, 17, 20, 23, 26, 29]
Process finished with exit code 0