5

我的目标:

我有一个构建的 docker 映像,并希望在该映像上运行我的所有流。

目前:

我有以下任务在本地 Dask Executor 上运行。运行代理的服务器与执行所需的 Python 环境不同my_task- 因此需要在预构建映像中运行。

我的问题是: 如何在 Dask Executor 上运行此流程,以便它在我提供的 docker 映像(作为环境)上运行?

import prefect
from prefect import task, Flow
from prefect.engine.executors import LocalDaskExecutor
from prefect.environments import LocalEnvironment


@task
def hello_task():
    logger = prefect.context.get("logger")
    logger.info("Hello, Docker!")


with Flow("My Flow") as flow:
    results = hello_task()

flow.environment = LocalEnvironment(
    labels=[], executor=LocalDaskExecutor(scheduler="threads", num_workers=2),
)

我认为我需要首先在该 docker 映像上启动服务器和代理(如此所述),但我想有一种方法可以简单地在提供的映像上运行 Flow。

更新 1

按照教程,我尝试了以下方法:

import prefect
from prefect import task, Flow
from prefect.engine.executors import LocalDaskExecutor
from prefect.environments import LocalEnvironment
from prefect.environments.storage import Docker


@task
def hello_task():
    logger = prefect.context.get("logger")
    logger.info("Hello, Docker!")


with Flow("My Flow") as flow:
    results = hello_task()

flow.storage = Docker(registry_url='registry.gitlab.com/my-repo/image-library')
flow.environment = LocalEnvironment(
    labels=[], executor=LocalDaskExecutor(scheduler="threads", num_workers=2),
)

flow.register(project_name="testing")

但这创建了一个图像,然后将其上传到registry_url提供的图像。之后,当我尝试运行注册的任务时,它会拉取新创建的图像,并且该任务现在停留在状态Submitted for execution几分钟。

我不明白为什么它会推送图像然后拉取它?相反,我已经在此注册表上构建了一个映像,我想指定一个应该用于执行任务的映像。

4

1 回答 1

2

我最终实现这一目标的方式如下:

  1. 在服务器上运行prefect server start(即不在 docker 内)。显然 docker 中的 docker-compose 不是一个好主意。
  2. prefect agent start在 docker 镜像中运行
  3. 确保 docker 镜像可以访问流(例如,通过在镜像和服务器之间安装共享卷)

您可以在此处查看我的答案的来源。

于 2020-10-12T20:23:51.843 回答