我的目标:
我有一个构建的 docker 映像,并希望在该映像上运行我的所有流。
目前:
我有以下任务在本地 Dask Executor 上运行。运行代理的服务器与执行所需的 Python 环境不同my_task
- 因此需要在预构建映像中运行。
我的问题是: 如何在 Dask Executor 上运行此流程,以便它在我提供的 docker 映像(作为环境)上运行?
import prefect
from prefect import task, Flow
from prefect.engine.executors import LocalDaskExecutor
from prefect.environments import LocalEnvironment
@task
def hello_task():
logger = prefect.context.get("logger")
logger.info("Hello, Docker!")
with Flow("My Flow") as flow:
results = hello_task()
flow.environment = LocalEnvironment(
labels=[], executor=LocalDaskExecutor(scheduler="threads", num_workers=2),
)
我认为我需要首先在该 docker 映像上启动服务器和代理(如此处所述),但我想有一种方法可以简单地在提供的映像上运行 Flow。
更新 1
按照本教程,我尝试了以下方法:
import prefect
from prefect import task, Flow
from prefect.engine.executors import LocalDaskExecutor
from prefect.environments import LocalEnvironment
from prefect.environments.storage import Docker
@task
def hello_task():
logger = prefect.context.get("logger")
logger.info("Hello, Docker!")
with Flow("My Flow") as flow:
results = hello_task()
flow.storage = Docker(registry_url='registry.gitlab.com/my-repo/image-library')
flow.environment = LocalEnvironment(
labels=[], executor=LocalDaskExecutor(scheduler="threads", num_workers=2),
)
flow.register(project_name="testing")
但这创建了一个图像,然后将其上传到registry_url
提供的图像。之后,当我尝试运行注册的任务时,它会拉取新创建的图像,并且该任务现在停留在状态Submitted for execution
几分钟。
我不明白为什么它会推送图像然后拉取它?相反,我已经在此注册表上构建了一个映像,我想指定一个应该用于执行任务的映像。