1

我有以下任务一直在运行我知道这一点,因为它在 Snowflake 中运行查询并且我不断收到 DUO 推送通知。每一个。5秒!我能做些什么来阻止它并且只在 DAG 运行时运行它

这是任务:

create_foreign_keys = SnowflakeQueryOperator(
    dag=dag,
    task_id='check_and_run_foreign_key_query',
    sql=SnowHook().run_fk_alter_statements(schema,query),
    trigger_rule=TriggerRule.ALL_DONE
)

这是在 sql 部分中调用的方法:

def run_fk_alter_statements(self, schema, additional_fk):

    fk_query_path = "/fkeys.sql"

    fd = open(f'{fk_query_path}', 'r')
    query = fd.read()
    fd.close()

    additions = []


    for fk in additional_fk:
            additions.append(f""" or (t2.table_name = '{fk['table_name']}' and t2.column_name = '{fk['column_name']}'
                            and t1.table_name = '{fk['ref_table_name']}' and t1.column_name = '{fk['ref_column_name']}')\n""".upper())


    raw_out = self.execute_query(query.format(schema=schema, fks=''.join(additions)), fetch_all=True)

    query_jobs = []
    for raw_query in raw_out:
        query_jobs.append(raw_query[0])


    return query_jobs
4

1 回答 1

1

sql=SnowHook().run_fk_alter_statements(schema,query)实例化中的调用实际上SnowflakeQueryOperator是顶级代码,因此每次调度程序解析 DAG 时都会执行它。您需要找到一种方法来在操作员的execute()方法中调用该函数。

您可以添加一个 TaskFlow 函数/PythonOperator任务来将输出推run_fk_alter_statements()送到XCom,然后SnowflakeQueryOperator使用它XCom来执行生成的 SQL。

于 2021-11-17T03:50:47.450 回答