问题标签 [process-pool]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
222 浏览

python - 在 jupyter notebook 中使用 .so python 库进行多处理

我正在尝试执行类似于以下的代码:

一般情况下效果很好。但是,当我在module_example中使用特定模块时,此代码会中断。查看这个特定模块的源代码,我意识到它使用导入了一些对象

来自名为 lib.cpython-36m-x86_64-linux-gnu.so 的文件。有谁知道这是否由于某种原因与ProcessPoolExecutor不兼容?非常感谢大家!

编辑:从脚本执行时,代码运行良好。正是在 jupyter notebook 中执行它时才会失败。

0 投票
1 回答
637 浏览

python - 如何将共享/托管字典与进程池一起使用(Python 3.x)

我正在做一个需要我从一些文件中提取大量信息的项目。关于项目的格式和大部分信息对于我要问的内容并不重要。我大多不明白如何与进程池中的所有进程共享这本字典。

这是我的代码(更改了变量名并删除了大部分代码,只需要知道部分):

据我了解,该 lamda 功能应该可以工作。我需要帮助的行是:pool.map(lambda file: getThing(file, 2, map), fileSet1)。它在那里给我一个错误。给出的错误是“AttributeError: Cant pickle local object 'main..'”。

任何帮助,将不胜感激!

0 投票
0 回答
4177 浏览

python - 将 tqdm 与 multiprocessing 一起用于多个进度条

我想监视不同进程的多个工作人员的进度。对于每个子流程,我都有自己的进度条,但它不能与ProcessPoolExecutor执行程序一起正常工作。

当替换为ThreadPoolExecutor(第二行)时,一切正常,所以我认为这可能与一些序列化问题有关。您能否帮助找出tqdm使用多处理时的正确调用。

python --versionPython 3.7.2tqdm==4.31.1,Ubuntu 18.04

0 投票
5 回答
2278 浏览

python - 如何同时计算大文件中的词频?

我需要统计一个 3GB 压缩的英文句子纯文本文件的词频,解压后大约 30GB。

collections.Counter我有一个带有and的单线程脚本gzip.open,需要几个小时才能完成。

由于逐行读取文件比拆分和计数要快得多,因此我正在考虑使用文件阅读器生成行和多个消费者进行拆分和计数的生产者-消费者流,最后将Counters 合并到得到单词出现。

但是,我找不到ProcessPoolExecutor将队列发送到的示例Executor,它们只是map列表中的单个项目。仅提供单线程示例asyncio.Queue

  • 这是一个巨大的文件,所以我无法读取整个文件并获得list之前的计数,因此我无法使用concurrent.futures.Executor.map. 但是我阅读的所有示例都使用固定列表作为开始。

  • 拆分和计数一个句子的时间相当于fork一个进程,所以我必须让每个消费者进程的寿命更长。我不认为mapcan merge Counters,所以我不能使用chunksize>1。因此,我必须给消费者一个队列,让他们继续计数,直到整个文件完成。但大多数示例仅将一件物品发送给消费者并用于chunksize=1000减少fork时间。

你能给我写一个例子吗?

我希望代码向后兼容 Python 3.5.3,因为 PyPy 更快。


我的真实案例是更具体的文件格式:

我需要计算从第 3 列到第 8 列的单列的每个直方图。所以我以词频作为一个更简单的例子。

我的代码是:

csv.DictReader花费最多时间。

cProfile


我的问题是,虽然 gzip 阅读器很快,但 csv 阅读器很快,我需要计算数十亿行。并且 csv 阅读器肯定比 gzip 阅读器慢。

因此,我需要将行分散到 csv 阅读器的不同工作进程中,并分别进行下游计数。在一个生产者和多个消费者之间使用队列很方便。

由于我使用的是 Python,而不是 C,是否有一些用于多处理和队列的抽象包装器?这可以ProcessPoolExecutorQueue班级一起使用吗?

0 投票
1 回答
1260 浏览

python - 在 Python 中使用 ProcessPoolExecutor 的运行调用数不正确

在 Python 的concurrent.futures标准模块中,为什么 a ProcessPoolExecutorismax_workers + 1而不是 a 中的max_workerslike ThreadPoolExecutor?仅当提交的调用数量严格大于池工作进程的数量时才会发生这种情况。

以下 Python 代码片段向 a 中的 2 个工作人员提交了 8 个调用ProcessPoolExecutor

打印这个(3个正在运行的电话;意外,因为有2个工人):








使用 aThreadPoolExecutor打印此(2 个正在运行的调用;预期):








0 投票
1 回答
1811 浏览

python - 为什么 ProcessPoolExecutor 和 Pool 在调用 super() 时会崩溃?

1.为什么下面使用该concurrent.futures模块的Python代码永远挂起?

该调用引发了一个不可见的异常[Errno 24] Too many open files(要查看它,请将行替换为executor.submit(super().f)print(executor.submit(super().f).exception())

但是,按预期替换ProcessPoolExecutorThreadPoolExecutor“称为”的打印件。

2.为什么下面使用该multiprocessing.pool模块的Python代码会引发异常AssertionError: daemonic processes are not allowed to have children

但是,按预期替换PoolThreadPool“称为”的打印件。

环境:CPython 3.7,MacOS 10.14。

0 投票
1 回答
219 浏览

python-3.x - 使用 ProcessPoolExecutor 时如何正确记忆?

我怀疑是这样的:

@memoize由于我使用的是多处理池并且进程共享不多,因此无法工作(假设是典型的基于 dict 的缓存)。至少它似乎不起作用。

在这种情况下记忆的正确方法是什么?最终,我还想将缓存腌制到磁盘并在后续运行中加载它。

0 投票
2 回答
573 浏览

python - `multiprocessing.Pool.map()` 似乎安排错误

我有一个请求服务器、检索一些数据、处理它并保存一个 csv 文件的函数。此功能应启动 20k 次。每次执行的持续时间都不同:有时会持续超过 20 分钟,有时会持续不到一秒。我决定继续multiprocessing.Pool.map并行执行。我的代码如下所示:

看看是如何prints生成的,它似乎long_list_of_filenames被分成 8 个部分并分配给每个部分,CPU因为有时只是在 20 分钟的执行中被阻塞,而在这 20 分钟内没有long_list_of_filenames处理任何其他元素。我所期望的是以mapFIFO 样式安排 cpu 核心中的每个元素。

我的情况有更好的方法吗?

0 投票
4 回答
9843 浏览

python - 星图与tqdm结合?

我正在做一些并行处理,如下:

其中输入看起来像: [(1,0.2312),(5,0.52) ...] 即,int 和 float 的元组。

代码运行良好,但我似乎无法将其包裹在加载栏(tqdm)周围,例如可以使用例如 imap 方法完成,如下所示:

星图也可以这样做吗?

谢谢!

0 投票
1 回答
224 浏览

python - pathos pools:在 N 个任务后更新工作进程

我正在构建一个并行 python 应用程序,它本质上调用了一个围绕外部库的 C 包装器。需要并行性才能在所有 CPU 内核上同时运行计算。

我最终使用了pathos.multiprocessing.ProcessPool,但这些池缺少maxtaskperchild标准multiprocessing.Pool类构造函数的参数(请参阅此处的参考资料)。我需要这个功能,因为 C 库依赖于进程时钟来定义一些执行时间限制,这些时间限制最终会在任务堆积起来时达到。

有没有办法让ProcessPool经理在给定数量的任务后更新工作流程?

阐明我的意图的示例代码:

我试过的

  • maxtasksperchild在构造函数中设置ProcessPool(参见上面的朴素示例)似乎没有做任何事情
  • 调用sys.exit()worker函数使程序挂起
  • 我在深入研究源代码时发现了一些提示