以下代码将函数映射到可迭代对象上。应用于每个元素的函数运行一个 docker 容器以计算其返回值:
import subprocess
def task(arg):
return subprocess.check_output(
["docker", "run", "ubuntu", "bash", "-c", f"echo 'result_{arg}'"]
)
args = [1, 2, 3]
for result in map(task, args):
print(result.decode("utf-8").strip())
result_1
result_2
result_3
在 dask 中通过云计算资源并行化这种计算的最简单方法是什么?
例如,如果可以执行以下操作,那就太好了。但这当然不起作用,因为在 Fargate 上执行 python 代码的 docker 容器正在运行默认的 dask 映像,因此没有能力自己生成 docker 容器(我不确定是否有或不是这个“docker-in-docker”方向的解决方案):
import subprocess
from dask.distributed import Client
from dask_cloudprovider import FargateCluster
import dask.bag
def task(arg):
return subprocess.check_output(
["docker", "run", "ubuntu", "bash", "-c", f"echo 'result_{arg}'"]
)
cluster = FargateCluster(n_workers=1)
client = Client(cluster)
args = [1, 2, 3]
for result in dask.bag.from_sequence(args).map(task).compute():
print(result)
我正在寻找一种不涉及在同一个 docker 映像中容纳不相关代码的解决方案。即,我希望我的任务用于其计算的 docker 映像是任意的第三方映像,我不必通过添加 python/dask 依赖项来更改它。所以我认为这排除了基于改变下一个工作节点使用的图像的解决方案dask_cloudprovider.FargateCluster/ECSCluster
,因为这将不得不容纳 python/dask 依赖项。