问题标签 [concurrent.futures]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
459 浏览

python - Python dict的并行查找

2 个或更多进程是否可以访问单个 Python 字典或列表?这些进程执行只读访问,但它们会在某些时候同时访问列表/字典的相同元素。我使用标准库的concurrent.futures模块尝试了以下操作,但执行时间非常慢。

0 投票
1 回答
2427 浏览

python - 我可以在 Future 中使用 ProcessPoolExecutor 吗?

我有一个接受列表的程序。对于这个列表中的每个值,它检索另一个列表并处理这个另一个列表。

基本上,它是一个 3 深度树,需要在每个节点上进行可能昂贵的处理。

每个节点都需要能够处理其子节点的结果。

我想做的是map从第一层的输入list到每个节点的结果。不过,在每一个过程中,我都希望map得到下一层的结果。

我担心的是每一层都有自己的最大工人数量。如果可能,我希望他们共享一个进程池,否则所有进程切换都会影响性能。

有没有办法,使用concurrency.futures或其他方法,让每一层共享同一个进程池?

一个例子是:

这样,每个子进程都会从同一个池中抽取。

或者,我可以做类似的事情(但不完全是)

import concurrent.futures.ProcessPoolExecutor(max_workers = 4) as executor

并且每次调用都executor从同一个进程池中提取?

0 投票
3 回答
9465 浏览

python - 多进程在进程之间共享不可序列化的对象

三个问题可能重复(但过于具体):

通过回答这个问题,可以回答所有其他三个问题。希望我说清楚:

一旦我在由多处理创建的某个进程中创建了一个对象:

  1. 如何将该对象的引用传递给其他进程?
  2. (不那么重要)我如何确保在我持有参考时这个过程不会死掉?

示例 1(已解决)

示例 2

假设f返回一个状态可变的对象。这个相同的对象应该可以从其他进程访问。

示例 3

我有一个对象,它有一个打开的文件和一个锁 - 我如何授予对其他进程的访问权限?

提醒

我不希望出现此特定错误。或针对此特定用例的解决方案。该解决方案应该足够通用,以便在进程之间共享不可移动的对象。对象可能在任何进程中创建。使所有对象都可移动并保留身份的解决方案也很好。

欢迎任何提示,任何指向如何实现解决方案的部分解决方案或代码片段都是值得的。所以我们可以一起创造一个解决方案。

这是解决此问题但没有多处理的尝试: https ://github.com/niccokunzmann/pynet/blob/master/documentation/done/tools.rst

问题

您希望其他进程如何处理引用?

引用可以传递给使用多处理创建的任何其他进程(重复 3)。可以访问属性,调用引用。访问的属性可能是也可能不是代理。

仅使用代理有什么问题?

也许没有问题,只有挑战。我的印象是代理有一个管理器,并且一个管理器有自己的进程,因此不可序列化的对象必须被序列化和传输(使用 StacklessPython/fork 部分解决)。还存在特殊对象的代理 - 为所有对象(可解决)构建代理很难但并非不可能。

解决方案?- 代理+经理?

Eric Urban 表明序列化不是问题。真正的挑战在于示例 2 和 3:状态的同步。我对解决方案的想法是为经理创建一个特殊的代理类。这个代理类

  1. 接受不可序列化对象的构造函数
  2. 获取一个可序列化的对象并将其传输到管理器进程。
  3. (问题)根据1.不可序列化对象必须在manager进程中创建。
0 投票
1 回答
4317 浏览

python - ThreadPoolExecutor 线程数

我正在尝试使用 futures backport 包在 Python 中使用 ThreadPoolExecutor。然而,问题是所有线程都是同时执行的,因此没有发生实际的池化。更具体地说,我得到了该函数的 10 个线程,而不是 5 个线程,然后是其他线程。我使用下面的代码你发现有什么问题还是只是向后移植的实现?谢谢!

0 投票
2 回答
6804 浏览

python - asyncio yield from concurrent.futures.Future of an Executor

我有一个long_task运行繁重的 cpu-bound 计算的函数,我想通过使用新的 asyncio 框架使其异步。生成的long_task_async函数使用 aProcessPoolExecutor将工作卸载到不受 GIL 约束的不同进程。

问题在于,由于某种原因,当 yield from 时concurrent.futures.Future返回的实例ProcessPoolExecutor.submit会抛出一个TypeError. 这是设计使然吗?asyncio.Future那些期货与阶级不兼容吗?什么是解决方法?

我还注意到生成器不可腌制,因此向 couroutine 提交ProcessPoolExecutor会失败。有什么干净的解决方案吗?

0 投票
2 回答
1975 浏览

python - Why doesn't concurrent.futures make a copy of arguments?

My understanding was that concurrent.futures relied on pickling arguments to get them running in different processes (or threads). Shouldn't pickling create a copy of the argument? On Linux it does not seem to be doing so, i.e., I have to explicitly pass a copy.

I'm trying to make sense of the following results:

Here's the code:

Where on earth is [97, 32, 17, 15, 57, 97, 63, 72, 60, 8] coming from? That's not even one of the sequences passed to submit.

The results differ slightly under Python 2.

0 投票
1 回答
222 浏览

scala - 分配过多内存时,Scala Future 不返回任何内容

使用 Scala-IDE 3.0.3(基于 Scala 2.10.4),以下代码通过显示List未来计算的前 10 个值以及Future completed消息正确完成:

但是,将范围更改List.range(1, 50)List.range(1, 5000)不显示任何内容并且(Failure未触发)。从逻辑上讲,它似乎与内存问题有关,但我不明白那里发生了什么。

更奇怪的是,在 REPL 中运行此代码不会导致问题。我在那里想念什么?

0 投票
1 回答
58 浏览

scala - Scala 和限时处理

我正在学习 Michael Genesereth 的通用游戏 (GGP) Coursera 课程。在 GGP 中,玩家可以在固定的时间内移动。我正在用 Scala 编写我的播放器。(底层 GGP 代码库是 Java。)Scala 是否提供任何支持(或是否有任何 Scala 库)可用于帮助确保计算在给定时间内响应。

0 投票
1 回答
596 浏览

java - 缓存线程池无限增长

我正在尝试在 Java 中实现一个缓存线程池来读取来自总线的数据。一切都很好......只要我在那辆公共汽车上有数据。

如果我断开数据线,程序开始从池中生成无限线程。我间歇性地检查 /proc/{PID}/fd 并在大约一个小时内从 8 变为 100+。最终系统崩溃。

我正在为这些线程(30 秒)提交超时值,它们确实触发了我在日志文件中看到的 TimeoutException Catch。这些线程不应该在超时时结束吗?

一些代码:

(我后来提供了一个可调用的)

我只是在学习如何在 Java 中进行并发处理,但据我所知,这些进程应该超时,但看起来他们只是坐在那里等待总线上没有数据的数据。

我错过了什么?

编辑:感谢您帮助我缩小范围。

这是我的可调用对象

0 投票
2 回答
732 浏览

python - 来自 ProcessPoolExecutor 的流式套接字

我正在尝试创建一个 Python 应用程序,其中一个进程(进程'A')接收请求并将其放入 ProcessPool(来自 concurrent.futures)。在处理此请求时,可能需要将消息传递给第二个进程(进程“B”)。我正在使用 tornado 的 iostream 模块来帮助包装连接并获得响应。

进程 A 未能从 ProcessPool 执行中成功连接到进程 B。我哪里错了?

向进程 A 发出初始请求的客户端:

收到初始请求的进程 A:

进程 B,它应该接收来自进程 A 的中继请求:

最后,进程 A 返回的错误,在“read_until”方法期间引发: