0

我是 Google Cloud Composer 的新手,在我创建的 DAG 中遇到了一个似乎很奇怪的问题。我有一个从云存储中获取 tar.gz 文件的过程,将其重新压缩为 .gz 文件,然后将 .gz 文件加载到 BigQuery。昨天,我尝试在该过程中添加一个新步骤,即从创建的“分片”插入新表。

在我更改 DAG 执行中的步骤顺序之前,我无法让它工作。在我的 DAG 中,我有一个名为“delete_tar_gz_files_op”的步骤。当它在“insert_daily_file_into_nli_table_op”之前执行时,插入从未运行(在 Composer 中没有失败,只是似乎根本没有运行)。当我交换这两个步骤的顺序而不对代码进行其他更改时,插入按预期工作。有谁知道这可能是什么原因?我不知道为什么会发生这种情况,因为这两个步骤根本不相关。一个从一个大查询表到另一个执行插入查询。另一个删除云存储中的 tar.gz 文件。

我的 dag 执行顺序目前有效:

initialize >>  FilesToProcess >> download_file >> convert_task >> upload_task >> gcs_to_bq >> archive_files_op >> insert_daily_file_into_nli_table_op >> delete_tar_gz_files_op

使用的一些代码:

    #The big query operator inserts the files from the .gz file into a table in big query.
    gcs_to_bq = GoogleCloudStorageToBigQueryOperator(
    task_id='load_basket_data_into_big_query'+job_desc,
    bucket="my-processing-bucket",
    bigquery_conn_id='bigquery_default',
    create_disposition='CREATE_IF_NEEDED',
    write_disposition='WRITE_TRUNCATE',
    compression='GZIP',
    source_objects=['gzip/myzip_'+process_date+'.gz'],
    destination_project_dataset_table='project.dataset.basket_'+clean_process_date,
    field_delimiter='|',
    skip_leading_rows=0,
    google_cloud_storage_conn_id="bigquery_default",
    schema_object="schema.json",
    dag=dag
    )

    #The created shard is then inserted into basket_raw_nli.basket_nli.  This is a partitioned table which contains only the NLI subtype
    insert_daily_file_into_nli_table_op = bigquery_operator.BigQueryOperator(
    task_id='insert_daily_file_into_nli_table_op_'+job_desc,
    bql=bqQuery,
    use_legacy_sql=False,
    bigquery_conn_id='bigquery_default',
    write_disposition='WRITE_APPEND',
    allow_large_results=True,
    destination_dataset_table=False,
    dag=dag)

    #The tar file created can now be deleted from the raw folder
    delete_tar_gz_files_op=python_operator.PythonOperator(
    task_id='delete_tar_gz_files_'+job_desc,
    python_callable=delete_tar_gz_files,
    op_args=[file, process_date],
    provide_context=False, 
    dag=dag)

    def delete_tar_gz_files(file, process_date):
        execution_command='gsutil rm ' + source_dir + '/' + file     
        print(execution_command)
        returncode=os.system(execution_command)
        if returncode != 0:
            #logging.error("Halting process...")
            exit(1)

手动运行状态: 运行状态

4

0 回答 0