这是我在 Stack 上的第一篇文章,是关于 Airflow 的。我需要实现一个 DAG,它将:
1/ 从 API 下载文件
2/ 将它们上传到 Google Cloud Storage
3/ 将它们插入 BigQuery
问题是第 1 步涉及大约 170 个要调用的帐户。如果在下载过程中出现任何错误,我希望我的 DAG 从异常结束的步骤自动重试。因此,我在我的任务之上实现了一个循环,例如:
dag = DAG('my_dag', default_args=DEFAULT_ARGS)
for account in accounts:
t1 = PythonOperator(task_id='download_file_' + account['id'],
python_callable=download_files(account),
dag=my_dag)
t2 = FileToGoogleCloudStorageOperator(task_id='upload_file_' + account['id'],
google_cloud_storage_conn_id = 'gcs_my_conn',
src = 'file_' + account['id'] + '.json',
bucket = 'my_bucket',
dag=my_dag)
t3 = GoogleCloudStorageToBigQueryOperator(task_id='insert_bq',
bucket = 'my_bucket',
google_cloud_storage_conn_id = 'gcs_my_conn',
bigquery_conn_id = 'bq_my_conn',
src = 'file_' + account['id'],
destination_project_dataset_table = 'my-project:my-dataset.my-table',
source_format = 'NEWLINE_DELIMITED_JSON',
dag=my_dag)
t2.set_upstream(t1)
t3.set_upstream(t2)
所以在 UI 级别,我有大约 170 个每个任务显示的实例。当我手动运行 DAG 时,就我所见,Airflow 什么也没做。DAG 不会初始化或排队任何任务实例。我想这是由于涉及的实例数量,但我不知道如何解决这个问题。
我应该如何管理这么多任务实例?
谢谢,
亚历克斯