1

参考这个,气流传感器允许我们在运行下一个任务之前检查一个标准。鉴于用户设置了超时和另一个标志,有没有办法标记成功终止传感器?

在我的用例中,我必须通过传感器检查条件,但只能在我希望 DAG / followings 任务正常运行的特定时间范围内。

4

1 回答 1

2

您可以通过创建自定义传感器类来做到这一点。您将需要覆盖该poke功能并放置您希望设置的逻辑。例如:

from airflow.sensors.sql import SqlSensor

class MySqlSensor(SqlSensor):

    def is_time_frame(self):
        # TODO: implement a function that returns True if we want to ignore the sensor

    def poke(self, context):
        if self.is_time_frame():
            return True
        super().poke(context)

在此示例中,当传感器戳它时,首先检查时间窗口。如果当前时间在窗口内,则传感器将返回 True 并退出。对于任何其他情况,传感器将完成它的工作 - 在该特定示例中运行 SQL 查询,直到查询返回 True。

于 2021-10-15T10:40:44.533 回答