问题标签 [process-pool]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
34 浏览

python-asyncio - asyncio:只能在前一个任务达到预定义阶段时启动任务吗?

我从希望应用于以下问题的 asyncio 开始:

  • 数据被分割成块。
  • 块是第一次压缩。
  • 然后将压缩的块写入文件中。
  • 所有块都使用一个文件,所以我需要一个一个地处理它们。

从这个上下文来看,为了更快地运行这个过程,write当前迭代的步骤一开始,compress下一次迭代的步骤是否也会被触发?

我可以这样做吗asyncio,保持类似的for循环结构?如果是的话,你能分享一些关于这个的建议吗?

我猜另一种并行运行的方法是使用相位并将相位与相位ProcessPoolExecutor完全分开。这意味着在不同的执行程序中压缩第一个所有块。compresswrite

只有当所有块都被压缩后,才开始写入步骤。asyncio但是,如果有意义的话,我想用第一种方法来研究第一种方法。

提前感谢您的帮助。最好的

0 投票
1 回答
68 浏览

python - 如何在多个参数上正确启动两个函数的并行执行?

我正在寻找一种方法来并行启动两个函数,每个函数都在一组给定的不同参数上执行。我用它pool.map来实现这一点。我创建了两个不同的进程,每个进程启动一个执行map. 这行得通 - 执行顺序有点狂野,但我会把它留到另一个问题上。

现在我在这里也找到了另一种方法(见第一个答案)。他们只使用一个池并map_async连续调用两次。所以我想知道,是否有这样做的首选方法?因为我读过(遗憾的是我不记得在哪里)最好只使用一个池,这意味着第二种方法(只使用一个池)更好。但是当我测量时间时,第一种方法(在不同的进程中使用两个池)实际上要快一点。此外,在第一种方法中,函数实际上是并行运行的,而在第二种方法中,首先执行第一次调用map_async,然后执行第二次调用。

这是我的测试代码:

再说一次,我的问题是:有没有一种方法比另一种更受欢迎?或者甚至可能有其他/更好的方法来做到这一点?

0 投票
0 回答
40 浏览

python - 如何在不等待其他进程的情况下使用 ProcessPoolExecutor 返回值?

我有下一个使用 ProcessPoolExecutor 的代码:它使用 function 生成随机值,run()如果 value < 5 则返回它并打印它:

问题是只有在其他进程完成时才会打印值,因为Value 3 < 5, Process: 18404在函数中打印时它会等待一段时间在 main 中打印值,因此日志如下所示:

事实上,我需要在生成值后立即获取并打印它们。怎么做 ?

0 投票
0 回答
22 浏览

python - 如何从一个主异步进程启动多个异步进程

好的,我可能会让这变得更复杂,但我有一个类:

我比有另一个类:

我想WorkerManager使用 asyncio 运行,并且WorkerManager我想为列表中的每个单词启动一个单独的进程,ProcessPoolExecutor并使用Workerfromworker_pool来处理列表中的每个项目。我怎么做??

0 投票
1 回答
54 浏览

python - 如何在主异步循环内的同步子进程中运行多个异步循环?

我有一个我运行的主函数,asyncio然后在它里面我曾经event_loop.run_in_executor()运行一个阻塞函数的多个进程。

我想做的是在每个进程中为每个进程运行一个新的 asyncio 循环,以便执行异步代码。

所以我有一个主要的异步函数,我在其中创建多个进程,并且我想在每个进程中创建一个新的 asyncio 循环。如何才能做到这一点?

0 投票
0 回答
36 浏览

python - concurrent.futures.ProcessPoolExecutor() 不适用于文件

我正在做一个人脸识别项目并使用https://face-recognition.readthedocs.io/en/latest/ lib。我写了一些使用这个库并识别 1 张照片的函数。然后我想将此功能并行化到所有照片。为此,我正在使用 concurrent.futures.ProcessPoolExecutor()

当 self.unknown_list 是照片名称列表时,该程序运行良好。但是当我试图提供文件对象(对项目更有用)就像 [open("imgname.format", "rb")] 的列表时,它对并行化没有任何作用。甚至不启动 self.recognize 功能。我验证了 lib 可以正确处理文件。同样的代码也可以在没有并行化的情况下完美运行。但是当我尝试这样做时,程序只是卡住而没有任何错误。提前致谢

0 投票
0 回答
19 浏览

python - ProcessPoolExecutor中频繁出现BrokenProcessPool错误

对于数据提取作业,我使用 ProcessPoolExecutor 从存储中提取数据,一旦提取达到内存中记录的阈值数量,提取的数据集通过 ProcessPoolExecutor 移交给另一个进程,然后主进程/迭代器继续获取记录。通过 ProcessPoolExecutor 生成的进程将转换、过滤接收到的数据集中的记录。

在处理较大的数据集时,我经常会经常收到以下 BrokenProcessPool。以此为提示(更大的数据集),调整相关参数,这个错误会延迟一点,但最终会抛出错误。由于缺乏信息,它仍然是一个不确定的谜题,无法确定它何时/哪个点会再次发生。

我确实查看了其他问题,但无法将其用作信息。

更具体地说,这里是 queues.py 中引发 queue.Full 错误的代码片段。

我不清楚在什么情况下会引发此错误,

  1. 是因为信号量不足吗?
  2. 是由于快速生产者和慢消费者的场景导致队列被填满吗?如果是这种情况,是否可以阻止写入队列的进程?
  3. 有没有办法知道队列中的可用空间?这样可以在编写器成功将元素放入队列之前对其进行限制吗?

关于如何确定性调试的任何想法或指示?

0 投票
1 回答
87 浏览

python - 在 docker 中使用 python pathos 多处理并行化代码

我在 30 个 CPU 上并行化代码,并确认在容器外部使用 python 库“pathos”可以正常工作。

但是,在 Docker 容器中将代码作为 Flask 应用程序的一部分运行时,它不起作用。我有三个容器:

  • 烧瓶应用程序,
  • 我用来将所有繁重的处理卸载到工作进程的redis,
  • 工作进程。

worker进程的代码可以总结为:

当我运行应用程序时,工作容器中代码的并行化部分使用的 CPU 永远不会超过 4 个。我在 docker stats 和htop. docker-compose yaml 文件中的容器没有 cpu 使用限制。

htop显示代码在任何时候仅在 4 个 cpu 上运行,但实际上它在任务期间随机切换使用哪个 cpu,因此工作容器可以访问所有 48 个 cpu。

摘要:使用此多处理代码运行应用程序很有帮助,但 CPU 使用率存在上限。

0 投票
0 回答
10 浏览

python - 如何在 Linux 中管理进程的内存使用情况?

我正在运行一个实际上是服务器的程序,您在其中输入作业请求,服务器在进程中计算繁重的计算(使用 a ProcessPoolExecutor),然后将结果保存在服务器上,用户可以检索该结果。

对服务器的某些输入会导致执行一个尝试消耗所有内存的进程。在 Windows 上,这不会导致脚本崩溃。然而,在 Linux 上,它确实如此(并导致我运行的整个服务器崩溃)。最终,我想做到这一点,以便任何会消耗所有内存的输入都将在进程完成之前被拒绝。但是,现在,我想更好地管理内存,以便它以类似于在 Windows 上运行的方式运行,或者只是杀死任何超过阈值内存使用量(例如 1 GB)的进程

有没有办法做到这一点?

0 投票
0 回答
47 浏览

python - Python 进程池执行器和命名元组导致单元测试错误->“无法腌制 '_io.TextIOWrapper' 对象”

回溯快照

我正在尝试将namedtuple与进程池执行器一起使用,并且在编写单元测试时,我不断收到错误代码中的dut代表我存储了2个函数的python文件,我从单元测试中调用它

下面是我的代码

这会导致错误,如下所示 Traceback(最近一次调用最后一次): TypeError: cannot pickle '_io.TextIOWrapper' object

当我用这个运行单元测试时,它会导致你看到的错误

下面是堆栈跟踪,错过了它 concurrent.futures.process._RemoteTraceback: