问题标签 [process-pool]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
446 浏览

python - 如何解决 ProcessPoolExecutor 产生的 PicklingError 应用于单个函数,而 ThreadPoolExecutor 工作正常 [concurrent.futures]

我想同时运行一个简单的函数,它将进程的输出写入 txt.file,然后将其存储到 DBFS(Databricks 文件系统)。在我的示例中,我同时使用了 ThreadPoolExecutor 类()和 ProcessPoolExecutor 类(),尽管 ThreadPoolExecutor 类成功运行,而第二个类生成了酸洗错误。我想用这两个类运行我的函数。如何解决 PicklingError?

请在下面找到我运行以复制我的问题的代码,

如果您在本地而不是在 databricks 集群中运行它

创建 spark df 和参数

方法 1 | 使用 ThreadPoolExecutor 类 - 完美运行

方法 2 | 使用 ProcessPoolExecutor 类 - 为 wait_on_device() 函数生成酸洗错误

使用 ProcessPoolExecutor 类,我得到一个 PicklingError: 在此处输入图像描述

在一般测试 ProcessPoolExecutor 的这个应用程序时,它一直在函数 wait_on_device() 上给我一个泡菜错误

如何解决酸洗错误?我已经搜索了各种方法,比如使用类对主函数进行全局调用,或者通过创建一个函数,import copyreg as copy_reg尽管它们都不能解决我的问题,可能是因为我没有正确创建它们。

到目前为止我的方法
正如@Steven Bethard在这里介绍的那样

但是 PicklingError 仍然存在。

[更新]---当我在 Databricks 上运行代码时生成上述 PicklingError ...在 Jupyter Notebook 的机器上本地运行相同的代码我只收到以下错误与 ProcessPoolExecutor,

在此处输入图像描述 在此处输入图像描述 在此处输入图像描述

我搜索过的其他相关问题但无法应用他们的解决方案。 相关问题1
相关问题2
相关问题3
相关问题4

0 投票
1 回答
107 浏览

python - 尝试运行 docplex 示例时出现 BrokenProcessPool 错误

我正在尝试与 python 进程池并行运行一些 cplex 模型。我试图在我的 Windows 10 -spyder 3.6.9 上将作为带有 docplex 的进程池示例运行。当我运行时,我收到此错误:

我尝试使用不同的机器并将 max_worker 设置为 1,但它没有帮助。

编辑: 我把我正在使用的代码让它更清楚。这是我的 process_pool.py:

其中一个是 zoomontecarlo2.py,其中包含 cplex 模型并使用 process_pool:

当引擎进入“for future in concurrent.futures.as_completed(future_to_i)”循环时,内存信息为:

svmem(总数=17091981312,可用=9288286208,百分比=45.7,使用=7803695104,免费=9288286208)

当它到达“res = future.result()”时,它会因上述错误而崩溃。

0 投票
0 回答
65 浏览

python-3.x - 我的 ProcessPoolExecutor 没有运行整个函数

我需要与不同的进程并行运行一个函数。我正在使用下面的代码。我的函数参数类型是(二进制数的嵌套列表,int)。问题是当我运行脚本进程开始运行但它们在函数中间完成(检查我在代码中放置“这里”的位置)没有错误。

0 投票
1 回答
158 浏览

python - Tornado ProcessPoolExecutor:如何限制进程数

我正在 Tornado 中编写一个 REST 服务器。我使用ProcessPoolExecutor带有可max_workers配置参数的 a 。但是,问题在于它似乎并没有按照我想要的方式有效地限制进程的数量。代码是

submit当池中同时活动的最大进程数大于例如 4 时,我需要生成异常。您有什么想法吗?

0 投票
1 回答
345 浏览

python - 使用顺序初始化器参数进行多进程池初始化

我有一些如下代码:

就问题而言, processing_function 和 arg_list 不相关。

问题是我得到了一个错误:

有没有办法在池中创建一个初始化进程,使得初始化它的参数中的一个是序列中的下一个数字?

PS 在编写的代码中,可以在初始化函数之外初始化所有连接对象,但在我的特定实例中并非如此。我需要将连接参数传递给初始化程序。

0 投票
3 回答
1906 浏览

python - os.sched_getaffinity(0) 与 os.cpu_count()

因此,我知道标题中两种方法之间的区别,但不知道实际含义。

据我了解:如果您使用的 NUM_WORKERS 多于实际可用的内核,您将面临性能大幅下降,因为您的操作系统会不断地来回切换以保持并行。不知道这是多么真实,但我在这里从比我聪明的人那里读到它。

在它的文档中os.cpu_count()说:

返回系统中的 CPU 数量。如果未确定,则返回 None。这个数字不等于当前进程可以使用的 CPU 数量。可以使用 len(os.sched_getaffinity(0)) 获得可用 CPU 的数量

因此,我试图弄清楚“系统”指的是一个进程是否可以使用比“系统”中更多的 CPU。

我只想安全有效地实现multiprocessing.pool功能。所以这是我总结的问题:

有什么实际意义:

-1是因为我发现如果我在处理数据时尝试工作,我的系统会少很多。

0 投票
2 回答
492 浏览

python - 如何从multiprocessing.Pool.map的worker_funtion内部为数组赋值?

基本上我想要的是将那些插入2到.ararworker_function

到目前为止,我唯一能做到的就是改变ar内部worker_function而不是外部的副本:

0 投票
1 回答
702 浏览

python - 如何将二维数组作为 multiprocessing.Array 传递给 multiprocessing.Pool?

我的目标是将父数组传递给mp.Pool并用 s 填充它,2同时将其分配给不同的进程。这适用于一维数组:

输出:

但是我怎样才能对 x 维数组做同样的事情呢?向 中添加维度arr

产生错误:

更改dtypeofarr也无济于事。

0 投票
1 回答
419 浏览

python - Python 3.8 - concurrent.futures.ProcessPoolExecutor 性能随时间下降

我正在尝试并行化我的一个匹配函数,它在一开始就起作用。很高兴看到我的 72 核 ec2 实例正在杀死它,大约一分钟左右它回到单核并且每秒迭代开始下降。

一开始我看到这个

多核处理

然后到这个

单核掉线

大约一分钟,处理比单核还不错。在多处理时,它每秒迭代大约250 次,然后下降到每秒 35 次

非常感谢任何指导。

编辑 - 附加信息 - 我原来的功能:

0 投票
1 回答
196 浏览

python - 向 concurrent.futures.ProcessPool 中的所有进程提交代码以执行

语境:

  • 使用 aconcurrent.futures.process.ProcessPool执行代码的 Python 应用程序服务器
  • 我们有时想在不重启整个服务器进程的情况下热重载导入的代码

(是的,我知道importlib.reload警告

为了让它工作,我想我必须在进程池管理的importlib.reload每个multiprocessing进程中执行。

有没有办法向进程池中的所有进程提交一些东西?