1

我需要在循环中重复一些任务(LOOP)。我从这个答案中得到了想法,作为可能的解决方案之一。当我从 VS Code 终端或通过 Jupyter 启动代码时 - 一切正常。但是,如果我通过 web UI Prefect "Quick Run" 运行,它会给出错误: 错误图片

我的代码示例:

from prefect import task, Flow, case, Parameter
from prefect.tasks.control_flow import merge
from prefect.tasks.notifications.email_task import EmailTask
from prefect.storage import Local
from prefect.engine import signals
from prefect.engine.signals import FAIL, RETRY, SUCCESS, LOOP, SKIP
from prefect.engine.results import LocalResult, PrefectResult

from datetime import datetime, timedelta


    
@task(name='loop_checking_files_exist', result=PrefectResult())
def loop_cheking_files_exists(run_date):
    
    with Flow('subflow', result=PrefectResult()) as subflow:
        date_diff, last_file_date = date_difference.run(run_date)
        
        with case(date_diff, True):
            new_date = last_file_date + timedelta(days = 1)
            
            cond_log_exist = check_file_exists(path_ftp, log_file_name)

            with case(cond_log_exist, True):
                false_result = cond_log_exist
                row_count, md5_sum = check_log_result(path_ftp, log_file_name)
                merging = merge(row_count, md5_sum)
                download = file_download(zip_file_name, path_ftp, path_interim, upstream_tasks=[merging])
                unzip = unzip_sku_file(path_interim, zip_file_name, upstream_tasks=[download])
                check_skunames_file = check_row_md5(path_interim, csv_file_name, row_count, md5_sum, upstream_tasks=[unzip])
            
                with case(check_skunames_file, True):
                    upload = file_download(zip_file_name, path_interim, path_pri+f'\{new_date_yyyy}010{new_date_mm}', remove=True)
                    msg_delete_csv_file = delete_file(csv_file_name, path_interim, upstream_tasks=[upload])
                    write_zipname = write_zip_name(zip_file_name, upstream_tasks=[msg_delete_csv_file])

                with case(check_skunames_file, False):
                    print('check_skunames_file, False')
                    fail_check_skunames = fail_email_notification()

                new_date_diff, last_file_date = date_difference(run_date, upstream_tasks=[write_zipname])
        
                success_notification = success_email_notification(zip_file_name, upstream_tasks=[write_zipname])
    
            with case(cond_log_exist, False):
                fail_notification = fail_email_notification(log_file_name, upstream_tasks=[cond_log_exist])
                false_result = action_if_false(upstream_tasks=[fail_notification])

    subflow_run_id = subflow.run()
    child_data = subflow_run_id.result[new_date_diff]._result.value

    if child_data == True:
        raise LOOP(message=f'Child data = {child_data}')
    else: 
        raise FAIL()

with Flow('Main Flow Name', result=PrefectResult()) as flow:
    
    date = datetime.now()

    loop_task = loop_cheking_files_exists(date)

    with case(loop_task, True):
        success_notification = success_email_notification()

    with case(loop_task, False):
        fail_notification = fail_email_notification()


# %%
if __name__ == "__main__":
    flow.register('Project_Name')
    flow.run()
4

0 回答 0