0

我有这个简单的 python 脚本。我怎样才能以适用于 dagster 的方式重写它?

import logging

from mypackage import function1, function2, function3, function4, function5


def main():
    try:
        function1()
        function2()
    except Exception as e:
        logging.exception(e)
        function4()
    else:
        function5()

if __name__ == '__main__:
    main()

这是我到目前为止一直在尝试的,但还有很长的路要走:

import logging

from dagster import success_hook, failure_hook
from mypackage import function1, function2, function3, function4, function5


@solid
def dag_function1() -> bool:
    myvar1 = True
    function1()
    return myvar1


@solid
def dag_function2() -> bool:
    myvar2 = True
    function2()
    return myvar2


@solid
def dag_function3() -> bool:
    myvar3 = True
    function3()
    return myvar3


@failure_hook
def dag_function5():
    logging.exception('NOT SURE HOW TO ACCESS MY EXCEPTION')
    function5()


@success_hook
def dag_function4():
    function4()


def main():
    dag_function3(dag_function1(), dag_function2())

我尝试过类似的方法,但 dagster 抛出错误 dagster.core.errors.DagsterInvariantViolationError: No jobs, pipelines, graphs, or repositories found

4

1 回答 1

3

为了将实体的输出传递给其他实体的输入,您需要创建一个管道来定义输入和输出之间的依赖关系。

从那里,您将能够执行管道:

import logging

from dagster import success_hook, failure_hook, solid, pipeline, execute_pipeline

from mypackage import function1, function2, function3, function4, function5


@solid
def dag_function1() -> bool:
    myvar1 = True
    function1()
    return myvar1


@solid
def dag_function2() -> bool:
    myvar2 = True
    function2()
    return myvar2


@solid
def dag_function3(input_1, input_2) -> bool:
    myvar3 = True
    function3()
    return myvar3


@failure_hook
def dag_function5(context):
    logging.exception(context.solid_exception)
    function5()


@success_hook
def dag_function4(context):
    pass


@pipeline(hook_defs={dag_function5, dag_function4})
def my_pipeline():
    dag_function3(dag_function1(), dag_function2())

if __name__ == '__main__':
    execute_pipeline(my_pipeline)

话虽如此,自 0.13.0 以来,Dagster 已迁移到一组新的核心 API(包括作业、操作和图表)。此处有一份迁移指南,详细说明了实体和管道如何映射到操作和作业。

于 2022-02-18T21:53:01.003 回答