是的,有几种不同的方法可以实现这种类型的缓存:
使用不同的缓存验证器
除了配置缓存过期(如您在上面所做的那样)之外,您还可以选择配置缓存验证器。在您的情况下,您可以使用输入或参数验证器。
使用缓存键
您可以通过在任务上指定 a 在任务之间(在单个流内和跨流)“共享”缓存cache_key
:
@task(cache_for=datetime.timedelta(hours=1), cache_key="my-key")
def some_task():
...
Cached
然后,这将通过键而不是任务 ID查找您的候选状态。
使用基于文件的目标
最后,越来越流行的设置是使用基于文件的target
任务。flow_run_id
然后,您可以使用诸如提供给您的任务的输入之类的内容来模板化此目标字符串。每当任务运行时,它首先检查指定目标位置是否存在数据,如果找到,则不会重新运行。例如:
@task(target="{flow_run_id}/{scheduled_start_time:%Y-%d-%m}/results.bytes")
def some_task():
...
如果以下两个都为真,则此模板具有重用目标数据的效果:
- 任务在同一天内重新运行
- 该任务作为同一流程运行的一部分重新运行
然后,您可以跨多个任务(或在您的情况下,跨所有映射的子项)共享此模板。
请注意,您还可以根据需要为target
模板提供输入和参数。