7

在 Prefect 中,假设我有一些管道为列表中的每个日期运行 f(date),并将其保存到文件中。这是一个非常常见的 ETL 操作。在气流中,如果我运行一次,它将回填所有历史日期。如果我再次运行它,它将知道该任务已经运行,并且只运行任何出现的新任务(即最近日期)。

在 Prefect 中,据我所知,它每天都会运行整个管道,即使前一天完成了 99% 的任务。在不切换到 Prefect Cloud 的情况下,有哪些解决方案可以解决这个问题?你只是在退出之前做一些事情,比如让每个任务缓存它在redis中完成的事情吗?

4

1 回答 1

10

Prefect 有许多一流的缓存处理方法,具体取决于您想要多少控制。对于每个任务,您可以指定是否应缓存结果、应缓存多长时间以及应如何使缓存失效(年龄、任务的不同输入、流参数值等)。

缓存任务的最简单方法是使用targets,它允许您指定任务具有可模板化的副作用(通常是本地或云存储中的文件,但可以是例如数据库条目、redis 键或其他任何东西)。在任务运行之前,它会检查副作用是否存在,如果存在,则跳过运行。

例如,此任务会将其结果写入本地文件,该文件自动以任务名称和当前日期为模板:

@task(result=LocalResult(), target="{task_name}-{today}")
def get_data():
    return [1, 2, 3, 4, 5]

只要存在匹配文件,任务就不会重新运行。因为{today}是目标名称的一部分,所以它将隐式缓存任务的值一天。您还可以使用模板中的参数(例如回填日期)来复制 Airflow 的行为。

要获得更多控制,您可以通过在任何任务上设置、和来使用 Prefect 的完整缓存机制。如果设置,任务将在一个状态而不是一个状态下完成。当与 Prefect Server 或 Prefect Cloud 等适当的编排后端配对时,可以通过未来运行相同任务(或任何具有相同 的任务)来查询状态。未来的任务将返回状态作为它自己的结果。cache_forcache_validatorcache_keyCachedSuccessCachedcache_keyCached

于 2020-08-17T02:21:49.367 回答