2

这是我在 Stack 上的第一篇文章,是关于 Airflow 的。我需要实现一个 DAG,它将:

1/ 从 API 下载文件

2/ 将它们上传到 Google Cloud Storage

3/ 将它们插入 BigQuery

问题是第 1 步涉及大约 170 个要调用的帐户。如果在下载过程中出现任何错误,我希望我的 DAG 从异常结束的步骤自动重试。因此,我在我的任务之上实现了一个循环,例如:

dag = DAG('my_dag', default_args=DEFAULT_ARGS)

for account in accounts:

    t1 = PythonOperator(task_id='download_file_' + account['id'],
                 python_callable=download_files(account),
                 dag=my_dag)

    t2 = FileToGoogleCloudStorageOperator(task_id='upload_file_' + account['id'],
                google_cloud_storage_conn_id = 'gcs_my_conn',
                src = 'file_'  + account['id'] + '.json',
                bucket = 'my_bucket',
                dag=my_dag)

    t3 = GoogleCloudStorageToBigQueryOperator(task_id='insert_bq',
                bucket = 'my_bucket',
                google_cloud_storage_conn_id = 'gcs_my_conn',
                bigquery_conn_id = 'bq_my_conn',
                src = 'file_'  + account['id'],
                destination_project_dataset_table = 'my-project:my-dataset.my-table',
                source_format = 'NEWLINE_DELIMITED_JSON',
                dag=my_dag)

    t2.set_upstream(t1)
    t3.set_upstream(t2)

所以在 UI 级别,我有大约 170 个每个任务显示的实例。当我手动运行 DAG 时,就我所见,Airflow 什么也没做。DAG 不会初始化或排队任何任务实例。我想这是由于涉及的实例数量,但我不知道如何解决这个问题。

我应该如何管理这么多任务实例?

谢谢,

亚历克斯

4

2 回答 2

0

您目前如何运行气流?你确定airflow scheduler正在运行?

您还可以运行airflow list_dags以确保可以编译 dag。如果你使用 Celery 运行气流,你应该注意你的 dag 出现list_dags在所有运行气流的节点上。

于 2017-07-25T08:30:19.917 回答
0

亚历克斯,在这里发帖会更容易,我看到你有 DEFAULT_ARGS 重试是在 DAG 级别,你也可以在任务级别设置重试。它在 BaseOperator 中,因为所有 Operator 都将继承 BaseOperator 然后您可以使用它,您可以在此处找到更多详细信息:https ://github.com/apache/incubator-airflow/blob/master/airflow/operators/python_operator.py和https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L1864,如果你在模型中检查 BaseOperator ,它有retriesand retry_delay,你可以这样做:

t1 = PythonOperator(task_id='download_file_' + account['id'],
                 python_callable=download_files(account),
                 retries=3,
                 retry_delay=timedelta(seconds=300),
                 dag=my_dag)
于 2017-08-01T12:04:04.130 回答