1

我是 Prefect 的新手,主要使用 Airflow 工作。我整理了一个工作流程,执行得很好,但任务没有按我期望的顺序执行。流在这里:

with Flow(name='4chan_extract') as flow:
    board_param = Parameter(name='board_name', required = True, default='pol')
    getData(board= board_param)
    checkDB(url= 'postgresql://postgres:user@localhost:5434/postgres')
    upload_raw(url="postgresql://postgres:user@localhost:5434/postgres", 
    board=board_param)
    remove_dupes(board=board_param)

然而,当我使用flow.visualise()这个流程时,DAG 看起来真的很奇怪。

我的理解是上下文运算符with设置顺序?在每个任务中使用up_stream都没有帮助。

任何帮助表示赞赏。

4

1 回答 1

2

如果您希望按顺序调用任务,一个接一个,您可以添加upstream_tasks到每个任务。此外,为了轻松传递状态依赖项,您可以在调用任务时为其分配一个名称(data = get_data(board=board_param)),这允许将此命名引用传递给下游依赖项。

我只能猜测您希望此流程看起来如何,但假设您希望它按顺序运行,这里有一个完整的示例和 DAG 可视化:

from prefect import task, Flow, Parameter


@task
def get_data(board):
    pass


@task
def check_db(url):
    pass


@task
def upload_raw(url, board):
    pass


@task
def remove_duplicates(board):
    pass


with Flow(name="4chan_extract") as flow:
    board_param = Parameter(name="board_name", required=True, default="pol")
    data = get_data(board=board_param)
    check = check_db(
        url="postgresql://postgres:user@localhost:5434/postgres", upstream_tasks=[data]
    )
    upload = upload_raw(
        url="postgresql://postgres:user@localhost:5434/postgres",
        board=board_param,
        upstream_tasks=[check],
    )
    remove_duplicates(board=board_param, upstream_tasks=[upload])

if __name__ == "__main__":
    flow.visualize()

在此处输入图像描述

于 2021-12-21T11:49:59.727 回答