我有这个简单的 python 脚本。我怎样才能以适用于 dagster 的方式重写它?
import logging
from mypackage import function1, function2, function3, function4, function5
def main():
try:
function1()
function2()
except Exception as e:
logging.exception(e)
function4()
else:
function5()
if __name__ == '__main__:
main()
这是我到目前为止一直在尝试的,但还有很长的路要走:
import logging
from dagster import success_hook, failure_hook
from mypackage import function1, function2, function3, function4, function5
@solid
def dag_function1() -> bool:
myvar1 = True
function1()
return myvar1
@solid
def dag_function2() -> bool:
myvar2 = True
function2()
return myvar2
@solid
def dag_function3() -> bool:
myvar3 = True
function3()
return myvar3
@failure_hook
def dag_function5():
logging.exception('NOT SURE HOW TO ACCESS MY EXCEPTION')
function5()
@success_hook
def dag_function4():
function4()
def main():
dag_function3(dag_function1(), dag_function2())
我尝试过类似的方法,但 dagster 抛出错误 dagster.core.errors.DagsterInvariantViolationError: No jobs, pipelines, graphs, or repositories found