2

我有一个使用.map(); 因此,我“循环”了多个输入,但是一些输入我只需要生成一次,但我注意到我的流程不断重新生成它们。

是否可以在运行期间缓存/检查任务的结果(用于其他任务)?

我的理解是可以缓存特定的时间,如下所示:

import datetime

from prefect import task

@task(cache_for=datetime.timedelta(hours=1))
def some_task():
    ...

但是,如果运行时间少于cache_for时间,缓存是否仍会保留下一次运行(如果不是,我猜长时间缓存会起作用)。

4

1 回答 1

3

是的,有几种不同的方法可以实现这种类型的缓存:

使用不同的缓存验证器

除了配置缓存过期(如您在上面所做的那样)之外,您还可以选择配置缓存验证器。在您的情况下,您可以使用输入或参数验证器。

使用缓存键

您可以通过在任务上指定 a 在任务之间(在单个流内和跨流)“共享”缓存cache_key

@task(cache_for=datetime.timedelta(hours=1), cache_key="my-key")
def some_task():
    ...

Cached然后,这将通过键而不是任务 ID查找您的候选状态。

使用基于文件的目标

最后,越来越流行的设置是使用基于文件的target任务flow_run_id然后,您可以使用诸如提供给您的任务的输入之类的内容来模板化此目标字符串。每当任务运行时,它首先检查指定目标位置是否存在数据,如果找到,则不会重新运行。例如:

@task(target="{flow_run_id}/{scheduled_start_time:%Y-%d-%m}/results.bytes")
def some_task():
    ...

如果以下两个都为真,则此模板具有重用目标数据的效果:

  • 任务在同一天内重新运行
  • 该任务作为同一流程运行的一部分重新运行

然后,您可以跨多个任务(或在您的情况下,跨所有映射的子项)共享此模板。

请注意,您还可以根据需要为target模板提供输入和参数。

于 2020-10-05T20:53:56.533 回答