0

我需要将文件从 s3 下载到 aiflow tmp\test\ 文件夹以处理 python 脚本并将文件加载回 s3。

Step1:创建文件夹
Step2:使用 bash 脚本下载文件 1
Step3:使用 bash 脚本下载文件 2
Step4:使用 bash 脚本下载文件 3
Step5:使用 bash 脚本下载文件 4
Step6:使用 bash 脚本下载文件 5
Step7:执行 python 脚本
第 8 步:将 python 输出加载到 s3 第 2 步到第 6 步(下载)在 DAG 中并行运行。

我创建了 DAG,它第一次运行良好。当我第二次下载很少的文件并且由于找不到模板/tmp/test/而导致很少的下载步骤(步骤 3 和步骤 5)异常时。

DEFAULT_ARGS = {
 'depends_on_past': False,
 'start_date': '2021-03-22'
}

with DAG(
  dag_id='generate_dag',
  schedule_interval='0 0/1 * * *',
default_args=DEFAULT_ARGS,
template_searchpath=['/tmp/test/']
) as dag:
remove_temp_test = BashOperator(   
    task_id="remove_temp_test_dir",
    bash_command=f"rm -rf /tmp/test"
)
get_usafacts_cases = BashOperator(
    task_id="get_usafact "url" --create-dirs -o 
/tmp/test/{cases_file}"
)


get_etl_script = S3DownloadOperator(
    task_id='get_etl_script_s3',
    s3_bucket='{{ var.json.etlconfig.SCRIPT_BUCKET }}',
    s3_prefix='/scripts/test.py',
    outpath='/tmp/test/',
    outfile='test.py'
)

get_county_pop_data = S3DownloadOperator(
    task_id="get_county_pop_s3",
    s3_bucket='{{ var.json.etlconfig.LAKE_BUCKET }}',
    s3_prefix='/test1.csv',
    outpath='/tmp/test/',
    outfile='test1.csv'
)

get_congdon_data = S3DownloadOperator(
    task_id="get_congdon_s3",
    s3_bucket='{{ var.json.etlconfig.LAKE_BUCKET }}',
    s3_prefix='/test2.csv',
    outpath='/tmp/test/',
    outfile='test2.csv'
)

get_hud_crosswalk = S3DownloadOperator(
    task_id="get_hud_crosswalk_s3",
    s3_bucket='{{ var.json.etlconfig.LAKE_BUCKET }}',
    s3_prefix='/test3.csv',
    outpath='/tmp/test/',
    outfile='test3.csv'
)

get_cbo_definition = S3DownloadOperator(
    task_id="get_cbo_file_s3",
    s3_bucket='{{ var.json.etlconfig.LAKE_BUCKET }}',
    s3_prefix='/test4.csv',
    outpath='/tmp/test/',
    outfile='test4.csv'
)

run_etl_script = BashOperator(
    task_id="run_etl_script_local",
    bash_command=f"python3 /tmp/test/{etl_script}"
)

remove_temp_test >> get_usafacts_cases >> get_usafacts_deaths >> get_etl_script >> get_county_pop_data  >> get_congdon_data  >> get_hud_crosswalk >> get_cbo_definition >> run_etl_script
4

0 回答 0