一个有趣的用例!你是问 Prefect ≤ 1.0 还是 Orion?对于 Orion,有一篇博客文章更详细地讨论了该问题并显示了示例流程。
但我假设您要求 Prefect ≤ 1.0。
为了从 Redis 读取数据并将其加载到 Postgres,比如说每 10 秒,您可以在 Prefect 任务中使用一个循环:
for iteration in range(1, 7):
logger.info("iteration nr %s", iteration)
read_from_redis_and_load_to_postgres() # your logic here
if iteration < 6:
logger.info("Sleeping for 10 seconds...")
time.sleep(10)
这个流程可以安排为每分钟运行一次。这将为您提供重试、可观察性和所有 Prefect 功能,并且每 10 秒将数据加载到 Postgres 不应使您的数据库不堪重负。
但是对于您获取实时数据并将其持续加载到 Redis 流的部分,您可以将其作为单独的服务而不是 Prefect 流来运行,因为 Prefect 1.0 流更倾向于批处理并且预计会在某些时候结束点以判断流运行是否成功。如果您将它作为一个永无止境的完美流,它可能会丢失流心跳并被僵尸杀手进程杀死。因此,运行这部分可能会更容易,例如作为 24/7 运行的单独容器化服务。您可以将其部署为单独的 Kubernetes 部署或 ECS 服务。
它还取决于许多因素,包括。这段代码在做什么,这个 API 有多可靠(您从中提取数据的源系统是否有一些速率限制?为什么有 500 条记录?这 500 条记录的填充频率是多少以及您最终写入的频率是多少?雷迪斯?)。
话虽如此,我很想知道您是否可以在Orion中实现它,类似于博客文章示例所做的那样。我们目前正在收集有关 Orion 流式处理用例的反馈,因此如果您在 Orion 中实现此功能,我们很想听听您对此的反馈。