我正在尝试为该source_objects
字段提供一个字符串列表,GoogleCloudStorageToBigQueryOperator
但使用以下代码时出现错误:
字符串索引必须是整数,而不是 unicode
我不知道的事情:
- 如何在 DAG 范围内获得 XCOM的
return
值?get_file_name
- 如何
xcom_pull
在 DAG 范围内调用函数而无需提供上下文?在我看来,任务实例不需要提供上下文。
我想到的事情:
- 重写操作符并将 XCOM 作为参数
我想做的事情:
- 我希望能够打电话给接线员
另外,操作员的某些字段似乎使用了一个名为 的功能templated_field
,模板字段背后的机制是什么?不只是为了PythonOperator
andBashOperator
吗?
最后一个,为什么不PythonOperator
返回一个TaskInstance
?
with DAG('bq_load_file_from_cloud_function', default_args=default_args) as dag:
def get_file_name_from_conf(ds, **kwargs):
fileName = kwargs['dag_run'].conf['fileName']
return [fileName]
get_file_name = PythonOperator(
task_id='get_file_name',
provide_context=True,
python_callable=get_file_name_from_conf)
# t1, t2 and t3 are examples of tasks created by instantiating operators
bq_load = GoogleCloudStorageToBigQueryOperator(
task_id='bq_load',
bucket='src_bucket',
#source_objects=['data.csv'],
source_objects=get_file_name.xcom_pull(context='', task_ids='get_file_name'),
destination_project_dataset_table='project:dataset.table',
write_disposition='WRITE_EMPTY')
bq_load.set_upstream(get_file_name)
我对 Python 和 Airflow 有点陌生。我猜这些事情应该是微不足道的。我确定我在这里误解了一些东西。