问题标签 [dask-distributed]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
python - 使用通过 ssh 隧道分发的 dask 访问散景服务器
问题
我正在设置一个集群来运行图像分析(从 MPI 移动到 Dask 和 Dask.distributed)。我通过隧道连接到主节点,但我不知道如何访问散景服务器。
步骤
1. 通过 ssh 隧道连接到我的服务器主节点:
ssh -L 7000:localhost:7000 simone@server.se
2. 启动dask-scheduler --port 7001 --bokeh 7002
3.ssh
到我要使用的节点(也在端口 7000 上建立隧道)并启动dask-worker --memory-limit=200e9
4. 启动jupyter notebook --port=7000 --no-browser
并打开一个chrome
会话并将浏览器指向localhost:7000
5. 启动Client()
指向调度程序地址
6.的aX11 forwarding
已损坏,我无法在笔记本电脑上使用它
当我查看dask-scheduler
页面的输出时,我得到:
并且client
似乎正确连接到工人:
问题
1) 将浏览器指向port 7000
而不是port 7001
设置时间表的位置是否正确?仅供参考:如果我使用 localhost:7001 或任何调度程序和仪表板的 IP 地址,我无法从浏览器加载任何内容。2) 如何访问散景图以评估性能?
3)额外的好处:有没有一种方法可以让我启动多个工人dask-ssh
并传递参数,例如--memory-limit
谢谢!
dask - 如何构建处理来自队列的固定数量输入的 Dask 应用程序?
我们需要实现以下内容。给定一个将提供已知数量消息的 Redis 通道:
对于从通道消费的每条消息:
- 从 Redis 获取 JSON 文档
- 解析 JSON 文档,提取结果对象列表
聚合所有结果对象以生成单个结果
我们希望将第 1 步和第 2 步分配给许多工作人员,并避免将所有结果收集到内存中。我们还想显示两个步骤的进度条。
但是,我们看不到构建应用程序的好方法,以便我们可以看到进度并保持工作在系统中移动,而不会因为不合时宜的时间而阻塞。
例如,在第 1 步中,如果我们从 Redis 通道读取到队列中,那么我们可以将队列传递给 Dask,在这种情况下,我们开始处理每条消息,而无需等待所有消息。但是,如果我们使用队列,我们就看不到显示进度的方法(大概是因为队列通常具有未知大小?)
如果我们从 Redis 通道收集到一个列表并将其传递给 Dask,那么我们可以看到进度,但是我们必须等待来自 Redis 的所有消息,然后才能开始处理第一个消息。
有没有推荐的方法来解决这类问题?
dask-distributed - 有没有办法从调度程序确认作业已被接受?
我dask.distributed.Client
用来连接到一个远程运行的 Dask 调度程序,该调度程序管理着一群工作人员。我正在使用client.submit
并跟踪返回的内容提交我的工作Future
:
我希望能够知道作业是否/何时已发送到调度程序并被调度程序接受。这样我就可以在调度程序出现故障时添加一些重试逻辑。
是否有一种简单的方法可以确认调度程序已收到并接受该作业?
一些额外的点:
- 我意识到它
distributed.client.Future
有一个status
属性,但我犹豫要不要使用它,因为它没有记录在 API 中。 - 我试过使用
dask.callbacks.Callback
但没有成功。任何有关使用回调的帮助distributed.Client
将不胜感激。
编辑:我也可以让工作在开始时发回通知,但如果客户不支持,我想把这种方法作为最后的手段。
dask - 将 dask 集合异步存储到文件/CSV
我正在使用 dask.distributed 实现各种数据处理管道。通常,原始数据是从 S3 读取的,最终处理过的(大型)集合也会写入 S3 上的 CSV。
我可以异步运行处理并监视进度,但我注意到将集合存储到文件的所有 to_xxx() 方法似乎都是同步调用。它的一个缺点是调用可能会阻塞很长时间。其次,我不能轻易地构建一个完整的图表以供以后执行。
有没有办法异步运行例如 to_csv() 并获取未来对象而不是阻塞?
PS:我很确定我可以自己实现异步存储,例如通过将集合转换为延迟()并存储每个分区。但这似乎是一个常见的情况——除非我错过了已经存在的功能,否则在框架中包含这样的东西会很好。
dask - Dask worker 优雅的任务失败
当我运行 dask.distributed worker 时,任务函数中抛出的任何异常都会传播到调度程序并终止整个作业。有没有办法优雅地失败任务,以便调度程序负责重试它(可能在另一个工作人员身上)?
python - 在 Dask 中序列化大于 2GB 的数据时出错
我正在使用带有一些大中间值的 Dask,并且我从 pickle 中得到错误,如下所示:
这是怎么回事?Dask 不支持序列化大于 2GB 的数据块吗?
dask - Dask 调度程序内存
随着时间的推移和执行的继续,我们的 dask 调度程序进程似乎在内存中膨胀。目前我们看到它使用 5GB 的内存,这似乎很高,因为所有数据都应该存在于工作节点上:
启动调度程序时,我们的内存使用量将低于 1GB。重新启动网络执行 client.restart() 似乎没有帮助,只有杀死调度程序进程本身并重新启动才能释放内存。
每个执行的单个任务的预期内存使用量是多少?调度程序真的只维护指向哪个工作人员包含未来结果的指针吗?
- - 编辑 - -
我认为我主要关心的是为什么 client.restart() 似乎没有释放调度程序进程使用的内存。我显然不希望它释放所有内存,而是回到基本水平。我们正在使用 client.map 在不同输入列表中执行我们的函数。执行后,一遍又一遍地重新启动客户端并拍摄调度程序内存的快照,我们看到以下增长:
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
27955 atoz 20 0 670556 507212 13536 R 43.7 6.2 1:23.61 dask-scheduler
27955 atoz 20 0 827308 663772 13536 S 1.7 8.1 16:25.85 dask-scheduler
27955 atoz 20 0 859652 696408 13536 S 4.0 8.5 19:18.04 dask-scheduler
27955 atoz 20 0 1087160 923912 13536 R 62.3 11.3 20:03.15 dask-scheduler
27955 atoz 20 0 1038904 875788 13536 S 3.7 10.7 23:57.07 dask-scheduler
27955 atoz 20 0 1441060 1.163g 12976 S 4.3 14.9 35:54.45 dask-scheduler
27955 atoz 20 0 1646204 1.358g 12976 S 4.3 17.4 37:05.86 dask-scheduler
27955 atoz 20 0 1597652 1.312g 12976 S 4.7 16.8 37:40.13 dask-scheduler
我想我只是惊讶于在执行 client.restart() 之后我们没有看到内存使用回到某个基线。
----进一步编辑---- 关于我们正在运行的更多信息,因为建议是如果我们传递大型数据结构,则将它们直接发送给工作人员。
我们发送一个字典作为每个任务的输入,当 json 转储字典时,大多数都在 1000 个字符以下。
---- 甚至进一步的编辑:转载问题---- 我们今天再次转载了这个问题。我关闭了调度程序并重新启动它,我们有大约 5.4 GB 的可用内存,然后我们运行我将在下面粘贴的函数,跨越 69614 个真正保存一些基于文件的信息的字典对象(我们所有的工作人员都映射到同一个NFS 数据存储,我们使用 Dask 作为分布式文件分析系统。
这是函数(注意:squarewheels4 是一个自制的惰性文件提取和分析包,它使用 Acora 和 libarchive 作为其基础,用于从压缩存档中获取文件并为文件编制索引。)
```
期货的生成方式如下:
futures = client.map(function_name, file_list)
在这种状态之后,我的目标是尝试恢复并让 dask 释放它分配的内存,这是我的努力:在取消期货之前:
在取消期货时:
取消期货后:
做了一个 client.restart() 之后
不管我在分布式系统中运行什么,我的期望是在取消期货之后它至少会恢复到接近正常水平......并且在执行 client.restart() 之后,我们肯定会接近我们的正常基线。我在这里错了吗?
--- 第二次重现 ---- 使用以下步骤重现了行为(尽管不是完全内存耗尽):
这是我的工人功能
我在 68617 次迭代/文件中运行它
在运行之前,我们看到使用了这么多内存:PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND 12256 atoz 20 0 1345848 1.107g 7972 S 1.7 14.2 47:15.24 dask-scheduler
运行后我们看到了这么多:
执行完 client.restart 后,我们看到:
dask-distributed - 工人连接,但计算失败
我让 dask-worker 连接到 dask-scheduler。我的问题是在发布任务后出现的。在我看来(在任务流中)工作人员确实执行了计算。来自 dask 工作人员的错误日志很长,我不明白 - 它说超时,连接被拒绝?哪个连接被拒绝?AFAIK 两台机器之间没有防火墙(在局域网上)。
请注意,外观相同/相似的错误会一遍又一遍地发生。最终,计算失败,说明“ValueError:找不到相关的array-original-0effb3cc096e32a82e95557c88b795fd。检查工作日志”
dask - 使用客户端检索、查看结果和取消期货
场景:用户 A 连接到 dask 调度程序,发送一个长作业使用client.map(func, list)
并在周末注销。用户 B 想要查看用户 A 创建的已完成期货的结果,并可能取消挂起的期货。
我们设法在工人中获得了期货的结果,如下所示:
做一个f.cancel()
什么都不做。有没有办法做到这一点?
dask - 分布式工作人员的 Dask 工作人员资源
当您在分布式多处理工作者中定义工作者资源( http://distributed.readthedocs.io/en/latest/resources.html )时,是否为所有进程定义了资源池?
例如,在我正在运行的工作主机上:
现在,如果我提交需要的任务,resources={"HOST":1}
这是否保证该机器上只有一个进程会在特定时刻执行此任务?