问题标签 [process-pool]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
200 浏览

python - Python Concurrent.futures CPU 使用率查询

在 Python 中使用 concurrent.futures 处理大型数据集( 9x200,000x4 np.floats )时,我注意到一开始 CPU 使用率很低(13% ~ 相当于使用了 1 个核心)。然而,过了一会儿,它达到了我对多处理的期望(80-90%)。如果有兴趣,这是我的代码片段。

有谁知道这样做的目的?随着我的粒子数量增加,这 1 个核心做某事所需的时间似乎呈指数增长。我的第一个想法是内存分配,因为我预计此运行将占用大约 1-1.5GB,但 python 文档中似乎没有关于此过程的任何内容,我想知道我是否错误地实现了该模块。我已经在相对较低的数据集(10,000 - 100,000)上对此进行了测试,并且肯定看到使用 1 个核心的持续时间有所增加。

非常感谢

一个

0 投票
1 回答
704 浏览

python - Windows 上的 Multiprocessing.pool.Pool:CPU 限制为 63?

当使用超过 63 个内核的 python 的 multiprocessing.pool.Pool 时,我得到ValueError

输出:

该程序似乎运行良好;结果正是我所期望的。我可以忽略ValueError吗?

背景:我已经用谷歌搜索了这个问题,它似乎与 Windows 上的 python 限制有关(_winapi.WaitForMultipleObjects);参见例如这里。建议的解决方法是将使用的内核数限制为 63。这并不令人满意,因为我想在我的服务器上使用 100 多个内核。我真的需要限制核心吗?为什么?有解决方法吗?

0 投票
0 回答
33 浏览

python - 使用完成回调处理在单独进程中运行的异步作业中的中断

我在单独的进程中运行大量 CPU 密集型计算,并希望在每个作业完成时执行一些回调。为此,我将asyncio与 a结合使用,并在每个viaconcurrent.futures.ProcessPoolExecutor上设置回调:Futureadd_done_callback

这很好用,除非计算被中断,例如,KeyboardInterrupt然后stderr会收到来自每个子进程的消息的垃圾邮件说RuntimeError: Event loop is closed这个问题的答案建议使用 Python 3.7+函数,因为它处理中断后的清理,但这似乎不适用于在单独进程中运行的作业。asyncio.run

RuntimeError在 a 之后发出的其中一个 s 的回溯KeyboardInterrupt指向可能的原因:

我的猜测是,当asyncio.run在处理中断时取消正在运行的任务时,回调在每个Future单独的进程中被调用,但asyncio.run在这些回调运行之前关闭事件循环,因为它没有被回调执行阻塞。

编辑:我在创建期货时注释掉了该add_done_callback块,这不会改变结果 - 表明问题毕竟与回调无关,而是与此处看到的相同问题。

0 投票
1 回答
863 浏览

python - 共享数据集和多个参数上的 ProcessPoolExecutor

我遇到了一个无法通过在网络上进行搜索来解决的问题。

我正在使用下面的最小代码。目标是通过多处理(使用 ProcessPoolExecutor)运行一些函数“f_sum”数百万次。我正在通过元组“args”列表添加多个参数。此外,该函数应该使用某种对所有执行都相同的数据(在示例中它只是一个数字)。出于内存原因,我不想将数据添加到“args”元组中。

到目前为止,我发现的唯一选择是将数据添加到“if name == ' main '”之外。这将(出于某种我不明白的原因)使变量可用于所有进程。但是,无法更新。此外,我真的不想在外部进行数据定义,因为在实际代码中它将基于数据导入并且可能需要额外的操作。

希望您能帮助并提前致谢!

PS:我在 Win 10 上使用 Python 3.7.9。

编辑原始问题:

>> 性能示例1

>> 输出

如果使用 randint 会导致错误的结果。对于 linspace 结果是正确的。

>> 性能示例 2 - 基于回答中的建议

>> 输出

@Booboo

我在原始问题中添加了两个示例,“性能示例 2”基于您的代码。第一个有趣的发现是,如果数据数组是用随机整数初始化的,我的原始代码实际上会给出不正确的结果。我注意到,每个进程本身都会初始化数据数组。由于它基于随机数,因此每个进程使用不同的数组进行计算,甚至与主进程不同。因此,该用例不适用于此代码,在您的代码中它始终是正确的。

但是,如果使用 linspace,它会起作用,因为每次都会给出相同的结果。对于从文件中读取一些数据的用例(这是我的实际用例)也是如此。示例 1 仍然比示例 2 快大约 3 倍,我认为时间主要用于在您的方法中初始化数组。

关于内存使用,我在任务管理器中看不到相关差异。即使形状不同,这两个示例都会产生类似的记忆增加。

我仍然相信您的方法是正确的方法,但是,在上面的示例中,内存使用情况似乎相似并且速度较慢。

0 投票
0 回答
108 浏览

python-3.x - 在 Python 中并行运行两个函数

目前我有一个程序可以扫描所有可用的 ble 设备,然后将其更新到列表 bt_addrs。然后将该列表传递给一个名为 data() 的函数,该函数执行该工作。要查看列表中的所有元素,我使用了 ProcessPoolExecutor。

我面临的一个问题是扫描只发生一次,然后就停止了。我想让扫描功能连续执行,这样如果附近有任何新设备,它将被添加到列表中,并且列表应该得到更新,并且数据功能应该在更新后的列表上并行工作。

拥有关于 Python 和多处理的中级知识是一个高级主题,帮助肯定会帮助我增加知识并帮助我获得整个程序的预期结果。如果您有任何其他方法,请告诉。

我在 Raspberry Pi 4-四核 Arm v8 处理器,python 版本 3.7.3 上运行此代码

以下是我目前拥有的代码

0 投票
0 回答
126 浏览

python - Python ProcessPoolExecutor 进程在完成所有任务之前退出

我正在运行一些代码来处理各种输入的大量数据,但在执行几分钟后,只剩下 1 或 2 个进程(当我从终端调用“top”时看到)

更多细节:我有一个主进程加载数据,将其分成组,然后启动进程池执行器并在生成器对象上使用“.map”,该生成器对象为每个进程生成每组数据以处理然后保存。我的多处理代码如下所示:

我在具有许多内核的服务器上运行我的代码,因此工作人员数量众多。

对于某些上下文,“生成器”产生我定义的类的对象,这些对象包含我希望处理的数据,并包括处理和保存数据的内部方法,这些方法由call dunder 方法调用。callObj 函数实际上只是接收一个对象并调用它:

回到问题:我的问题是,一旦加载了数据,并且我从终端调用“top”,顶级进程就会被 Python 进程填满,并且执行速度非常快。但是,活动的 python 进程的数量逐渐下降,直到大约 10-15 分钟后,只有一两个实际执行我的代码!

  • 生成器继续产生要处理的对象,我知道这是因为一两个进程确实保持活动状态,继续处理它们,所以任务队列不为空

  • 查看“顶部”列表中剩余两个下的接下来的几个进程,它们都最多占用 2% 的 CPU,所以它不像有其他一些大型任务占用我的处理能力......可能是什么继续?

  • 我不确定每个进程是否完成一项任务并退出,或者退出前的几个,不太知道如何检查这个..

0 投票
1 回答
37 浏览

python-3.x - 在 ProcessPoolExecutor 中显示正在运行的执行程序标识

这是我遇到的问题。由于机密性,我不会共享代码,而是提供一些虚拟示例。

假设我们有一个类如下:

现在让我们说在其他模块中我们有这个:

所以问题从这里开始。我想知道哪些实例在各自的进程中运行 doSomething() 函数。我想在该实例的函数在进程中运行时设置一个变量 = 1,并在它完成时将其设置为零。

每个实例都有自己的名称和 ID。有没有办法找出进程中正在运行的实例的名称?

这个问题让我很困惑,找不到合适的解决方案。

十分感谢。

0 投票
0 回答
90 浏览

python-3.x - 具有大块大小的多处理 pool.imap 正在跳过处理可迭代中的某些记录

我正在使用带有传递块大小的多处理 pool.imap() 处理字符串列表。我的列表长度为 1821,imap 中的进程为 4。我试图为每个进程提供几乎相等数量的块大小,因此将块大小设置为 455。也尝试使用 500。但这使我的 imap 跳过了一些记录。跳过也不是那么随机,因为它是有序列表。一旦我将块大小更改为 200,imap 就开始将所有记录发送到我的目标函数。有人可以解释为什么 chunksize > 450 在这里引起问题,而根据文档,理想情况下,它应该在每个进程中划分为 1821/4 = 455 或 456 rec。另请注意,在我的函数中,我正在使用该字符串并运行一些步骤,每个步骤需要几秒钟。

0 投票
1 回答
504 浏览

python - 我遇到了来自 concurrent.futures 的 ProcessPoolExecutor 的问题

我有一个大代码需要一段时间来进行计算,我决定学习多线程多处理,因为我的处理器只有 20% 用于进行计算。在多线程没有任何改进之后,我决定尝试多处理,每当我尝试使用它时,即使在非常简单的代码上也只会显示很多错误。

这是我的大型计算繁重代码开始出现问题后测试的代码:

在我遇到的错误消息中说

这不是全部信息,因为它非常大,但我认为我可能会有所帮助以帮助我。错误消息中的几乎所有其他内容都类似于“行中的错误 ... in ...”

如果它可能有帮助,大代码位于:https ://github.com/nobody48sheldor/fuseeinator2.0 它可能不是最新版本。

0 投票
0 回答
41 浏览

python - 导入函数和使用 ProcessPoolExecutor 时出现 ModuleNotFoundError

我正在尝试使用 ProcessPoolExecutor 并行执行 python 脚本(此处称为 file_2.py)。该脚本使用我在另一个文件(此处称为 file_1.py)中编写的函数。这两个文件都在同一个目录中。

如果我重视功能并按顺序运行脚本,一切都很好。此外,当我将函数从 file_2.py 复制到 file_1.py 时,脚本并行运行顺利。但是,当我尝试结合使用 ProcessPoolExecutor 导入函数时,出现以下错误:

ModuleNotFoundError:没有名为“file_1”的模块。

如果有人知道如何解决这个问题,我会很棒!提前致谢 :)

这是我的文件的简化版本: