仍在尝试了解 Dask 和 Kubernetes 如何与其他容器一起工作,所以希望是否有人可以普遍地说以下是否可以工作?具体来说,我不太了解 Dask 分布式是否/如何将数据分发到集群中运行另一个容器的假脱机 pod,该容器解析该数据并将其返回给 dask 以供后续功能使用。在这里,“其他”容器是转换数据的编译程序。
类似于以下内容:
import dask
from dask import delayed, compute
from dask_kubernetes import KubeCluster
cluster = KubeCluster('worker-spec.yml')
cluster.scale(10)
client = Client(cluster)
@delayed
def run_transformer(raw_data):
transformed_data = run_transformer_container(raw_data)
return transformed_data
@delayed
def upload_to_s3(transformed_data):
success = True
[...]
return success
raw_data = [string1, string2, ..., stringN]
output = []
for x in raw_data:
f = run_transformer(x)
g = upload_to_s3(f)
output.append(g)
transformed_data = compute(output)
Dask 延迟处理将 N 个任务分配给 10 个工作节点,然后每个工作节点将raw_data内容(可能是一个字符串,或者可能是一个腌制对象)传递到该工作节点上包含一个容器的假脱机 pod,该容器将摄取和在上传到 S3 之前转换数据并返回解析后的数据(通过未指定的run_transformer_container函数,但是这会起作用)。