1

我正在使用 concurrent.futures.ProcessPoolExecutor 并行运行 python 代码。基本上我所做的是

with concurrent.futures.ProcessPollExecutor(max_workers=10) as executor:
    futures = {executor.submit(my_function, i)
               for i in range(n)}
    
    for fut in concurrent.futures.as_completed(futures):
        print(fut.result())

这适用于少量n但对于较大的 n 它会占用大量 RAM。我觉得存储期货集(或列表)正在占用 RAM。因此,我尝试不存储期货集并在 my_function 本身中实现我想要对结果执行的操作。喜欢

with concurrent.futures.ProcessPollExecutor(max_workers=10) as executor:
    for i in range(n) :
        executor.submit(my_function, i)

但仍然占用大量内存。

通过更多的挖掘,我发现了这个。我知道第一个 for 循环提交所有任务,但执行它们需要时间。所以那些提交但未执行的任务将存储在 RAM 中。

直觉上,我明白,不应该一次提交所有的任务,而是随着前面的任务完成逐步提交。我不想在循环中添加任何睡眠/延迟。有没有更好的方法来做到这一点。我真的不明白使用mapmethod 而不是submitchunksize参数的作用以及如何决定分配给它的值。

有没有更好或优雅的方法来做到这一点?还是我完全错了?我以前用过 GNU 并行,它不会导致这么大的 RAM 问题。我想要一个只有 python 的解决方案。

4

1 回答 1

0

解决方案是使用队列来限制未决的并发期货的数量,或者对已完成的期货期货使用定期池来发送一批期货,而不是一次发送所有期货。

这篇文章与您的问题密切相关,可能会帮助您解决问题。

于 2021-10-19T15:43:41.073 回答