1

我正在尝试为该source_objects字段提供一个字符串列表,GoogleCloudStorageToBigQueryOperator但使用以下代码时出现错误:

字符串索引必须是整数,而不是 unicode

我不知道的事情:

  • 如何在 DAG 范围内获得 XCOM的return值?get_file_name
  • 如何xcom_pull在 DAG 范围内调用函数而无需提供上下文?在我看来,任务实例不需要提供上下文。

我想到的事情:

  • 重写操作符并将 XCOM 作为参数

我想做的事情:

  • 我希望能够打电话给接线员

另外,操作员的某些字段似乎使用了一个名为 的功能templated_field,模板字段背后的机制是什么?不只是为了PythonOperatorandBashOperator吗?

最后一个,为什么不PythonOperator返回一个TaskInstance

with DAG('bq_load_file_from_cloud_function', default_args=default_args) as dag:

    def get_file_name_from_conf(ds, **kwargs):
        fileName = kwargs['dag_run'].conf['fileName']
        return [fileName]

    get_file_name = PythonOperator(
        task_id='get_file_name',
        provide_context=True,
        python_callable=get_file_name_from_conf)

    # t1, t2 and t3 are examples of tasks created by instantiating operators
    bq_load = GoogleCloudStorageToBigQueryOperator(
        task_id='bq_load', 
        bucket='src_bucket', 
        #source_objects=['data.csv'], 
        source_objects=get_file_name.xcom_pull(context='', task_ids='get_file_name'), 
        destination_project_dataset_table='project:dataset.table', 
        write_disposition='WRITE_EMPTY')

    bq_load.set_upstream(get_file_name)

我对 Python 和 Airflow 有点陌生。我猜这些事情应该是微不足道的。我确定我在这里误解了一些东西。

4

1 回答 1

2

经过多次测试,我想出了这个解决方案,感谢 tobi6 的评论为我指明了正确的方向。我不得不使用template_fields功能。

当我试图返回一个带有单个字符串的列表时,我遇到了连接错误,所以我必须在我的 XCOM 中返回一个字符串,并用括号括住对 XCOM 的模板调用,以使结果成为一个列表。

这是最终代码:

with DAG('bq_load_file_from_cloud_function', default_args=default_args) as dag:

    def get_file_name_from_conf(ds, **kwargs):
        return kwargs['dag_run'].conf['fileName']

    get_file_name = PythonOperator(
        task_id='get_file_name',
        provide_context=True,
        python_callable=get_file_name_from_conf)

    bq_load = GoogleCloudStorageToBigQueryOperator(
        task_id='bq_load', 
        bucket='src_bucket', 
        source_objects=["{{ task_instance.xcom_pull(task_ids='get_file_name') }}"],
        destination_project_dataset_table='project:dataset.table', 
        write_disposition='WRITE_APPEND')

    bq_load.set_upstream(get_file_name)
于 2018-08-17T13:18:26.333 回答