我正在尝试在执行数据流作业的 Airflow 2.1.2 中运行作业。数据流作业从存储桶中读取数据并将其上传到 bigquery。DAG 中的 dataflow_default_options 区域定义为 europe-west1,但它被 DAG 中的实际作业覆盖到 us-central1。因此,数据流作业在大查询上传时失败,因为该区域是 us-central1
在我使用旧版本的气流(1.10.15)之前它工作正常。下面的代码:
DEFAULT_DAG_ARGS = {
'start_date': YESTERDAY,
'email': models.Variable.get('email'),
'email_on_failure': True,
'email_on_retry': False,
'retries': 0,
'project_id': models.Variable.get('gcp_project'),
'dataflow_default_options': {
'region': 'europe-west1',
'project': models.Variable.get('gcp_project'),
'temp_location': models.Variable.get('gcp_temp_location'),
'runner': 'DataflowRunner',
'zone': 'europe-west1-d'
}
}
with models.DAG(dag_id='GcsToBigQueryTriggered',
description='A DAG triggered by an external Cloud Function',
schedule_interval=None,
default_args=DEFAULT_DAG_ARGS,
max_active_runs=1) as dag:
# Args required for the Dataflow job.
job_args = {
'input': 'gs://{{ dag_run.conf["bucket"] }}/{{ dag_run.conf["name"] }}',
'output': models.Variable.get('bq_output_table'),
'fields': models.Variable.get('input_field_names'),
'load_dt': DS_TAG
}
# Main Dataflow task that will process and load the input delimited file.
dataflow_task = dataflow_operator.DataFlowPythonOperator(
task_id="data-ingest-gcs-process-bq",
py_file=DATAFLOW_FILE,
options=job_args)
如果我将 dataflow_task 选项中的区域更改为 europe-west1,则 Dataflow 作业会通过,但是它在 Airflow 中失败并显示 404 错误代码,因为它等待错误区域(us-central1)中数据流作业的 JOB_DONE 状态.
我错过了什么吗?任何帮助将不胜感激?