0

我在建立一个完美的管道时遇到了麻烦。假设我有一个文件,称之为streamA.py 和streamB.py。这两个文件的目的是连续 24/7 流式传输数据,并且每 500 条流式传输的记录一次将数据推送到 redis 流中。
我创建了另一个名为 redis_to_postgres.py 的文件,它异步抓取 redis 流中的所有数据并将数据推送到 postgresql 并从我最近推送的 id 的 redis 流中清理内存。这是通过异步完成的。一旦上一个管道开始,我希望每 15 分钟定时一次。

这样做最实用的方法是什么?在这种情况下,我会创建 3 个单独的管道吗?一个用于streamA,一个用于streamB,第三个用于从redis读取并推送到postgresql并最终清理数据?或者我会创建一个管道以并行方式流式传输数据,而另一个管道只是读取并推送到 postgres?谢谢

4

1 回答 1

1

一个有趣的用例!你是问 Prefect ≤ 1.0 还是 Orion?对于 Orion,有一篇博客文章更详细地讨论了该问题并显示了示例流程。

但我假设您要求 Prefect ≤ 1.0。

为了从 Redis 读取数据并将其加载到 Postgres,比如说每 10 秒,您可以在 Prefect 任务中使用一个循环:

for iteration in range(1, 7):
  logger.info("iteration nr %s", iteration)
  read_from_redis_and_load_to_postgres() # your logic here
  if iteration < 6:
      logger.info("Sleeping for 10 seconds...")
      time.sleep(10)

这个流程可以安排为每分钟运行一次。这将为您提供重试、可观察性和所有 Prefect 功能,并且每 10 秒将数据加载到 Postgres 不应使您的数据库不堪重负。

但是对于您获取实时数据并将其持续加载到 Redis 流的部分,您可以将其作为单独的服务而不是 Prefect 流来运行,因为 Prefect 1.0 流更倾向于批处理并且预计会在某些时候结束点以判断流运行是否成功。如果您将它作为一个永无止境的完美流,它可能会丢失流心跳并被僵尸杀手进程杀死。因此,运行这部分可能会更容易,例如作为 24/7 运行的单独容器化服务。您可以将其部署为单独的 Kubernetes 部署或 ECS 服务。

它还取决于许多因素,包括。这段代码在做什么,这个 API 有多可靠(您从中提取数据的源系统是否有一些速率限制?为什么有 500 条记录?这 500 条记录的填充频率是多少以及您最终写入的频率是多少?雷迪斯?)。

话虽如此,我很想知道您是否可以在Orion中实现它,类似于博客文章示例所做的那样。我们目前正在收集有关 Orion 流式处理用例的反馈,因此如果您在 Orion 中实现此功能,我们很想听听您对此的反馈。

于 2022-02-23T12:22:38.460 回答