0

我正在尝试在执行数据流作业的 Airflow 2.1.2 中运行作业。数据流作业从存储桶中读取数据并将其上传到 bigquery。DAG 中的 dataflow_default_options 区域定义为 europe-west1,但它被 DAG 中的实际作业覆盖到 us-central1。因此,数据流作业在大查询上传时失败,因为该区域是 us-central1

在我使用旧版本的气流(1.10.15)之前它工作正常。下面的代码:

DEFAULT_DAG_ARGS = {
    'start_date': YESTERDAY,
    'email': models.Variable.get('email'),
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 0,
    'project_id': models.Variable.get('gcp_project'),
    'dataflow_default_options': {
        'region': 'europe-west1',
        'project': models.Variable.get('gcp_project'),
        'temp_location': models.Variable.get('gcp_temp_location'),
        'runner': 'DataflowRunner',
        'zone': 'europe-west1-d'
    }
}

with models.DAG(dag_id='GcsToBigQueryTriggered',
                description='A DAG triggered by an external Cloud Function',
                schedule_interval=None,
                default_args=DEFAULT_DAG_ARGS,
                max_active_runs=1) as dag:
    # Args required for the Dataflow job.
    job_args = {
        'input': 'gs://{{ dag_run.conf["bucket"] }}/{{ dag_run.conf["name"] }}',
        'output': models.Variable.get('bq_output_table'),
        'fields': models.Variable.get('input_field_names'),
        'load_dt': DS_TAG
    }

    # Main Dataflow task that will process and load the input delimited file.
    dataflow_task = dataflow_operator.DataFlowPythonOperator(
        task_id="data-ingest-gcs-process-bq",
        py_file=DATAFLOW_FILE,
        options=job_args)

如果我将 dataflow_task 选项中的区域更改为 europe-west1,则 Dataflow 作业会通过,但是它在 Airflow 中失败并显示 404 错误代码,因为它等待错误区域(us-central1)中数据流作业的 JOB_DONE 状态.

我错过了什么吗?任何帮助将不胜感激?

4

0 回答 0