1

使用Helm,我创建了一个 Dask 集群。

NAME                               READY   STATUS    RESTARTS   AGE
dask01-jupyter-aaa-aaaa            1/1     Running   0          3d19h
dask01-scheduler-bbb-bbbb          1/1     Running   0          3d19h
dask01-worker-ccc-cccc             1/1     Running   0          3d19h
dask01-worker-ddd-dddd             1/1     Running   0          3d19h
dask01-worker-eee-eeee             1/1     Running   0          3d19h

我可以运行基本的 Dask 工作负载。

import dask.array as da

array = da.ones((1000, 1000, 1000), chunks=(100, 100, 10))

现在,我想以某种方式将它连接到客户端:

from dask import distributed

cluster = None # TODO: configure KubeCluster somehow https://kubernetes.dask.org/en/latest/
client = distributed.Client(cluster)

如果我想启动一个集群,这很有效:

from dask_kubernetes import KubeCluster

cluster = KubeCluster.from_yaml('worker-spec.yml')

但是如何连接到现有集群?

4

2 回答 2

1

Dask Helm Chartdask-kubernetes是两个以不同方式工作的独立项目。它们不兼容在一起。

如果您使用的是作为 Helm Chart 的一部分创建的 Jupyter Notebook,那么一切都已经为您配置好了,您可以使用默认选项创建 Dask 客户端。

from distributed import Client
client = Client()

如果您希望使用不同的 Python 环境,例如本地计算机上的环境,则必须指定调度程序的远程地址。这将根据您配置 Helm Chart 的方式而有所不同。

例如,如果您通过负载均衡器公开您的调度程序,您需要将您的客户端指向它。

from distributed import Client
client = Client('tcp://<load balancer ip>:8786')

如果您获得 Helm Chart 部署的状态,它将在打印的注释中显示有关如何连接到调度程序的信息。

helm status <depoyment name>
于 2020-05-26T10:11:39.830 回答
1

如果您已经安装了 Dask Helm 包,那么您可以使用它kubectl来检索 TCP 连接地址以传递给distributed.Client. 例如,如果您选择使用dask-abc分布式的 Helm 版本名称。在helm install- 请参见此处- 设置集群时,您可以遵循Dask Helm 和 Kubernetes 文档并使用kubectl get serviceswithjsonpath过滤此命令的输出并仅检索 Dask 调度程序服务的 IP 地址(将命名为dask-abc-scheduler)。

是一个使用kubectl get podswithjsonpath获取 pod 名称的类似示例(请参见以 开头的行pods=$()。您必须等到服务的 IP 地址可用,使用--watch标志(请参阅此处的注释部分)

$ export RELEASE_NAME=dask-abc

# wait until load balancer EXTERNAL_IP is available
$ kubectl get services --wait $RELEASE_NAME-scheduler

# get Dask scheduler address
$ dask_scheduler=$(kubectl get services \
      $RELEASE_NAME-scheduler \
      --output=jsonpath='{.status.loadBalancer.ingress[0].ip}')
$ echo $dask_scheduler

然后,dask_scheduler上面打印的地址可以distributed.Client()在您的 Python 代码中传递给的 url 中使用

> client_connection_url = "tcp://<dask-scheduler>:8786"
> client = distributed.client(client_connection_url)
> print(client)
.
.
.
.
于 2020-05-27T21:18:37.797 回答