我已经设置了一个Dask
集群,我很乐意向Prefect
它发送基本流程。现在我想做一些更有趣的事情,并使用我的 python 库获取自定义 docker 映像,并在 dask 集群上执行流/任务。
我的假设是我可以离开 dask 集群(调度程序和工作人员),因为它们使用自己的 python 环境(在检查所有各种消息传递库后到处都有匹配的版本)。也就是说,如果 Flow 在我的 custom 中执行,我不希望将我的库添加到那些机器上storage
。但是,要么我没有正确设置存储,要么假设上述情况是不安全的。换句话说,也许在我的自定义库中腌制对象时,Dask 集群确实需要了解我的 python 库。假设我有一些通用的 python 库,叫做data
...
import prefect
from prefect.engine.executors import DaskExecutor
#see https://docs.prefect.io/api/latest/environments/storage.html#docker
from prefect.environments.storage import Docker
#option 1
storage = Docker(registry_url="gcr.io/my-project/",
python_dependencies=["some-extra-public-package"],
dockerfile="/path/to/Dockerfile")
#this is the docker build and register workflow!
#storage.build()
#or option 2, specify image directly
storage = Docker(
registry_url="gcr.io/my-project/", image_name="my-image", image_tag="latest"
)
#storage.build()
def get_tasks():
return [
"gs://path/to/task.yaml"
]
@prefect.task
def run_task(uri):
#fails because this data needs to be pickled ??
from data.tasks import TaskBase
task = TaskBase.from_task_uri(uri)
#task.run()
return "done"
with prefect.Flow("dask-example",
storage = storage) as flow:
#chain stuff...
result = run_task.map(uri=get_tasks())
executor = DaskExecutor(address="tcp://127.0.01:8080")
flow.run(executor=executor)
谁能解释这种基于 docker 的工作流程如何/是否应该工作?