我正在尝试以动态方式传递 postgres 运算符中的参数。
获取 id 列表 (get_query_id_task)
传递 id 列表以获取并执行查询 (get_query_text_task)
get_query_id_task = PythonOperator( task_id='get_query_id', python_callable=query_and_push, #provide_context=True, op_kwargs={ 'sql' : read_sql('warmupqueryid.sql') } ) get_query_text_task= PostgresOperator( task_id='get_query_text', postgres_conn_id='redshift', trigger_rule=TriggerRule.ALL_DONE, params={'query_ids': " {{ ti.xcom_pull(task_ids='get_query_id_task', key='return_value') }}"}, sql="""SELECT LISTAGG(CASE WHEN LEN (RTRIM(TEXT)) = 0 THEN TEXT ELSE RTRIM(TEXT) END,'') within group(ORDER BY SEQUENCE) AS TEXT FROM stl_querytext WHERE query in {{ macros.custom_macros.render_list_sql(params.query_ids) }};""", )
Xcom push 返回查询列表如下:
[(19343160,), (19350561,), (19351381,), (19351978,), (19356674,), (19356676,), (19356678,), (19356681,), (19356682,), (19359607,)]
我使用插件来渲染 xcom 推送:
def render_list_sql(li):
l = []
for index, tup in enumerate(li):
idd = tup[0]
return tuple(l)
# Defining the plugin class
class AirflowTestPlugin(AirflowPlugin):
name = "custom_macros"
macros = [render_list_sql]
使用下面提供的解决方案已解决了该问题。但是我无法在 id 列表上进行循环。所以它只是向我显示一个ID。而且我不确定如何循环遍历 id。
