问题标签 [dask-distributed]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
908 浏览

python - 使用通过 ssh 隧道分发的 dask 访问散景服务器

问题
我正在设置一个集群来运行图像分析(从 MPI 移动到 Dask 和 Dask.distributed)。我通过隧道连接到主节点,但我不知道如何访问散景服务器。

步骤
1. 通过 ssh 隧道连接到我的服务器主节点:
ssh -L 7000:localhost:7000 simone@server.se
2. 启动dask-scheduler --port 7001 --bokeh 7002
3.ssh到我要使用的节点(也在端口 7000 上建立隧道)并启动dask-worker --memory-limit=200e9
4. 启动jupyter notebook --port=7000 --no-browser并打开一个chrome会话并将浏览器指向localhost:7000
5. 启动Client()指向调度程序地址
6.的aX11 forwarding已损坏,我无法在笔记本电脑上使用它

当我查看dask-scheduler页面的输出时,我得到:

并且client似乎正确连接到工人:

问题
1) 将浏览器指向port 7000而不是port 7001设置时间表的位置是否正确?仅供参考:如果我使用 localhost:7001 或任何调度程序和仪表板的 IP 地址,我无法从浏览器加载任何内容。2) 如何访问散景图以评估性能?
3)额外的好处:有没有一种方法可以让我启动多个工人dask-ssh并传递参数,例如--memory-limit

谢谢!

0 投票
1 回答
162 浏览

dask - 如何构建处理来自队列的固定数量输入的 Dask 应用程序?

我们需要实现以下内容。给定一个将提供已知数量消息的 Redis 通道:

  1. 对于从通道消费的每条消息:

    • 从 Redis 获取 JSON 文档
    • 解析 JSON 文档,提取结果对象列表
  2. 聚合所有结果对象以生成单个结果

我们希望将第 1 步和第 2 步分配给许多工作人员,并避免将所有结果收集到内存中。我们还想显示两个步骤的进度条。

但是,我们看不到构建应用程序的好方法,以便我们可以看到进度并保持工作在系统中移动,而不会因为不合时宜的时间而阻塞。

例如,在第 1 步中,如果我们从 Redis 通道读取到队列中,那么我们可以将队列传递给 Dask,在这种情况下,我们开始处理每条消息,而无需等待所有消息。但是,如果我们使用队列,我们​​就看不到显示进度的方法(大概是因为队列通常具有未知大小?)

如果我们从 Redis 通道收集到一个列表并将其传递给 Dask,那么我们可以看到进度,但是我们必须等待来自 Redis 的所有消息,然后才能开始处理第一个消息。

有没有推荐的方法来解决这类问题?

0 投票
0 回答
45 浏览

dask-distributed - 有没有办法从调度程序确认作业已被接受?

dask.distributed.Client用来连接到一个远程运行的 Dask 调度程序,该调度程序管理着一群工作人员。我正在使用client.submit并跟踪返回的内容提交我的工作Future

我希望能够知道作业是否/何时已发送到调度程序并被调度程序接受。这样我就可以在调度程序出现故障时添加一些重试逻辑。

是否有一种简单的方法可以确认调度程序已收到并接受该作业?

一些额外的点:

  • 我意识到它distributed.client.Future有一个status属性,但我犹豫要不要使用它,因为它没有记录在 API 中。
  • 我试过使用dask.callbacks.Callback但没有成功。任何有关使用回调的帮助distributed.Client将不胜感激。

编辑:我也可以让工作在开始时发回通知,但如果客户不支持,我想把这种方法作为最后的手段。

0 投票
1 回答
223 浏览

dask - 将 dask 集合异步存储到文件/CSV

我正在使用 dask.distributed 实现各种数据处理管道。通常,原始数据是从 S3 读取的,最终处理过的(大型)集合也会写入 S3 上的 CSV。

我可以异步运行处理并监视进度,但我注意到将集合存储到文件的所有 to_xxx() 方法似乎都是同步调用。它的一个缺点是调用可能会阻塞很长时间。其次,我不能轻易地构建一个完整的图表以供以后执行。

有没有办法异步运行例如 to_csv() 并获取未来对象而不是阻塞?

PS:我很确定我可以自己实现异步存储,例如通过将集合转换为延迟()并存储每个分区。但这似乎是一个常见的情况——除非我错过了已经存在的功能,否则在框架中包含这样的东西会很好。

0 投票
1 回答
446 浏览

dask - Dask worker 优雅的任务失败

当我运行 dask.distributed worker 时,任务函数中抛出的任何异常都会传播到调度程序并终止整个作业。有没有办法优雅地失败任务,以便调度程序负责重试它(可能在另一个工作人员身上)?

0 投票
1 回答
551 浏览

python - 在 Dask 中序列化大于 2GB 的数据时出错

我正在使用带有一些大中间值的 Dask,并且我从 pickle 中得到错误,如下所示:

这是怎么回事?Dask 不支持序列化大于 2GB 的数据块吗?

0 投票
1 回答
1153 浏览

dask - Dask 调度程序内存

随着时间的推移和执行的继续,我们的 dask 调度程序进程似乎在内存中膨胀。目前我们看到它使用 5GB 的内存,这似乎很高,因为所有数据都应该存在于工作节点上:

启动调度程序时,我们的内存使用量将低于 1GB。重新启动网络执行 client.restart() 似乎没有帮助,只有杀死调度程序进程本身并重新启动才能释放内存。

每个执行的单个任务的预期内存使用量是多少?调度程序真的只维护指向哪个工作人员包含未来结果的指针吗?

- - 编辑 - -

我认为我主要关心的是为什么 client.restart() 似乎没有释放调度程序进程使用的内存。我显然不希望它释放所有内存,而是回到基本水平。我们正在使用 client.map 在不同输入列表中执行我们的函数。执行后,一遍又一遍地重新启动客户端并拍摄调度程序内存的快照,我们看到以下增长: PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND 27955 atoz 20 0 670556 507212 13536 R 43.7 6.2 1:23.61 dask-scheduler 27955 atoz 20 0 827308 663772 13536 S 1.7 8.1 16:25.85 dask-scheduler 27955 atoz 20 0 859652 696408 13536 S 4.0 8.5 19:18.04 dask-scheduler 27955 atoz 20 0 1087160 923912 13536 R 62.3 11.3 20:03.15 dask-scheduler 27955 atoz 20 0 1038904 875788 13536 S 3.7 10.7 23:57.07 dask-scheduler 27955 atoz 20 0 1441060 1.163g 12976 S 4.3 14.9 35:54.45 dask-scheduler 27955 atoz 20 0 1646204 1.358g 12976 S 4.3 17.4 37:05.86 dask-scheduler 27955 atoz 20 0 1597652 1.312g 12976 S 4.7 16.8 37:40.13 dask-scheduler

我想我只是惊讶于在执行 client.restart() 之后我们没有看到内存使用回到某个基线。

----进一步编辑---- 关于我们正在运行的更多信息,因为建议是如果我们传递大型数据结构,则将它们直接发送给工作人员。

我们发送一个字典作为每个任务的输入,当 json 转储字典时,大多数都在 1000 个字符以下。

---- 甚至进一步的编辑:转载问题---- 我们今天再次转载了这个问题。我关闭了调度程序并重新启动它,我们有大约 5.4 GB 的可用内存,然后我们运行我将在下面粘贴的函数,跨越 69614 个真正保存一些基于文件的信息的字典对象(我们所有的工作人员都映射到同一个NFS 数据存储,我们使用 Dask 作为分布式文件分析系统。

这是函数(注意:squarewheels4 是一个自制的惰性文件提取和分析包,它使用 Acora 和 libarchive 作为其基础,用于从压缩存档中获取文件并为文件编制索引。)

```

期货的生成方式如下:

futures = client.map(function_name, file_list)

在这种状态之后,我的目标是尝试恢复并让 dask 释放它分配的内存,这是我的努力:在取消期货之前:

在取消期货时:

取消期货后:

做了一个 client.restart() 之后

不管我在分布式系统中运行什么,我的期望是在取消期货之后它至少会恢复到接近正常水平......并且在执行 client.restart() 之后,我们肯定会接近我们的正常基线。我在这里错了吗?

--- 第二次重现 ---- 使用以下步骤重现了行为(尽管不是完全内存耗尽):

这是我的工人功能

我在 68617 次迭代/文件中运行它

在运行之前,我们看到使用了这么多内存:PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND 12256 atoz 20 0 1345848 1.107g 7972 S 1.7 14.2 47:15.24 dask-scheduler

运行后我们看到了这么多:

执行完 client.restart 后,我​​们看到:

0 投票
0 回答
604 浏览

dask-distributed - 工人连接,但计算失败

我让 dask-worker 连接到 dask-scheduler。我的问题是在发布任务后出现的。在我看来(在任务流中)工作人员确实执行了计算。来自 dask 工作人员的错误日志很长,我不明白 - 它说超时,连接被拒绝?哪个连接被拒绝?AFAIK 两台机器之间没有防火墙(在局域网上)。

请注意,外观相同/相似的错误会一遍又一遍地发生。最终,计算失败,说明“ValueError:找不到相关的array-original-0effb3cc096e32a82e95557c88b795fd。检查工作日志”

0 投票
1 回答
191 浏览

dask - 使用客户端检索、查看结果和取消期货

场景:用户 A 连接到 dask 调度程序,发送一个长作业使用client.map(func, list)并在周末注销。用户 B 想要查看用户 A 创建的已完成期货的结果,并可能取消挂起的期货。

我们设法在工人中获得了期货的结果,如下所示:

做一个f.cancel()什么都不做。有没有办法做到这一点?

0 投票
1 回答
684 浏览

dask - 分布式工作人员的 Dask 工作人员资源

当您在分布式多处理工作者中定义工作者资源( http://distributed.readthedocs.io/en/latest/resources.html )时,是否为所有进程定义了资源池?

例如,在我正在运行的工作主机上:

现在,如果我提交需要的任务,resources={"HOST":1}这是否保证该机器上只有一个进程会在特定时刻执行此任务?