我需要在循环中重复一些任务(LOOP)。我从这个答案中得到了想法,作为可能的解决方案之一。当我从 VS Code 终端或通过 Jupyter 启动代码时 - 一切正常。但是,如果我通过 web UI Prefect "Quick Run" 运行,它会给出错误:
我的代码示例:
from prefect import task, Flow, case, Parameter
from prefect.tasks.control_flow import merge
from prefect.tasks.notifications.email_task import EmailTask
from prefect.storage import Local
from prefect.engine import signals
from prefect.engine.signals import FAIL, RETRY, SUCCESS, LOOP, SKIP
from prefect.engine.results import LocalResult, PrefectResult
from datetime import datetime, timedelta
@task(name='loop_checking_files_exist', result=PrefectResult())
def loop_cheking_files_exists(run_date):
with Flow('subflow', result=PrefectResult()) as subflow:
date_diff, last_file_date = date_difference.run(run_date)
with case(date_diff, True):
new_date = last_file_date + timedelta(days = 1)
cond_log_exist = check_file_exists(path_ftp, log_file_name)
with case(cond_log_exist, True):
false_result = cond_log_exist
row_count, md5_sum = check_log_result(path_ftp, log_file_name)
merging = merge(row_count, md5_sum)
download = file_download(zip_file_name, path_ftp, path_interim, upstream_tasks=[merging])
unzip = unzip_sku_file(path_interim, zip_file_name, upstream_tasks=[download])
check_skunames_file = check_row_md5(path_interim, csv_file_name, row_count, md5_sum, upstream_tasks=[unzip])
with case(check_skunames_file, True):
upload = file_download(zip_file_name, path_interim, path_pri+f'\{new_date_yyyy}010{new_date_mm}', remove=True)
msg_delete_csv_file = delete_file(csv_file_name, path_interim, upstream_tasks=[upload])
write_zipname = write_zip_name(zip_file_name, upstream_tasks=[msg_delete_csv_file])
with case(check_skunames_file, False):
print('check_skunames_file, False')
fail_check_skunames = fail_email_notification()
new_date_diff, last_file_date = date_difference(run_date, upstream_tasks=[write_zipname])
success_notification = success_email_notification(zip_file_name, upstream_tasks=[write_zipname])
with case(cond_log_exist, False):
fail_notification = fail_email_notification(log_file_name, upstream_tasks=[cond_log_exist])
false_result = action_if_false(upstream_tasks=[fail_notification])
subflow_run_id = subflow.run()
child_data = subflow_run_id.result[new_date_diff]._result.value
if child_data == True:
raise LOOP(message=f'Child data = {child_data}')
else:
raise FAIL()
with Flow('Main Flow Name', result=PrefectResult()) as flow:
date = datetime.now()
loop_task = loop_cheking_files_exists(date)
with case(loop_task, True):
success_notification = success_email_notification()
with case(loop_task, False):
fail_notification = fail_email_notification()
# %%
if __name__ == "__main__":
flow.register('Project_Name')
flow.run()