参考这个,气流传感器允许我们在运行下一个任务之前检查一个标准。鉴于用户设置了超时和另一个标志,有没有办法标记成功终止传感器?
在我的用例中,我必须通过传感器检查条件,但只能在我希望 DAG / followings 任务正常运行的特定时间范围内。
参考这个,气流传感器允许我们在运行下一个任务之前检查一个标准。鉴于用户设置了超时和另一个标志,有没有办法标记成功终止传感器?
在我的用例中,我必须通过传感器检查条件,但只能在我希望 DAG / followings 任务正常运行的特定时间范围内。
您可以通过创建自定义传感器类来做到这一点。您将需要覆盖该poke
功能并放置您希望设置的逻辑。例如:
from airflow.sensors.sql import SqlSensor
class MySqlSensor(SqlSensor):
def is_time_frame(self):
# TODO: implement a function that returns True if we want to ignore the sensor
def poke(self, context):
if self.is_time_frame():
return True
super().poke(context)
在此示例中,当传感器戳它时,首先检查时间窗口。如果当前时间在窗口内,则传感器将返回 True 并退出。对于任何其他情况,传感器将完成它的工作 - 在该特定示例中运行 SQL 查询,直到查询返回 True。