问题标签 [prefect]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
python - 如何在失败时恢复 Prefect 流程而无需重新运行整个流程?
TL;博士;
我无法使用 prefect 的FlowRunner来解决上述问题。我可能要么用错了(见下文),要么错过了一些东西。真的很感激任何指点!
问题
我通读了优秀的核心文档,发现处理失败和本地调试的部分与此最相关(可能遗漏了一些东西!)。FlowRunner类似乎(对我来说)是解决方案。
看看我是否可以使用 Flow Runner 来恢复失败的流程:
- 进行了失败的流程运行:
- 在 iPython 中运行它并保存状态:
将 1 / 0 替换为 1 / 1
failure()
这样任务就会成功:最后将先前的状态传递给
flow_runner
希望它会恢复流程:
整个流程再次运行,包括 3 秒成功的任务。
workflow - 从外部触发 Prefect 工作流
我目前有一个在 EC2 实例上本地运行的 Prefect 工作流。我可以通过 UI 在 localhost:8080 上触发我的工作流程。有没有办法通过 REST API 或其他方式从外部触发 Prefect 工作流程(比如 AWS Lambda)?
我知道 Airflow 支持实验性的 REST API。
prefect - Prefect 如何同时扩展数千个工作流?
我有一个在本地运行的完美服务器(0.13 核心版本)。我在具有 64 GB RAM 和 32 个 CPU 核心的服务器机器上循环调用 flow.run() 100 万次。当它运行到大约 300 次时,它开始从 GraphQL 抛出连接拒绝错误。
我仍在考虑是否在我的工作流程中使用 Prefect,但看起来它占用了太多 RAM。Prefect 如何同时扩展数千个工作流?
我用一个简单的例子运行工作流程:
然后我调用 REST API 来触发从 1 到 1000 的流。
我知道 GraphQL 正在使用大量内存(最多 10 GB RAM)。然后 Prefect UI 开始徘徊在 100 左右。
我不确定我是否将 Prefect 工作流程用作其预期用途,但如果可能的话,我想解决这个问题。
python - 完善如何避免重新运行任务
在 Prefect 中,假设我有一些管道为列表中的每个日期运行 f(date),并将其保存到文件中。这是一个非常常见的 ETL 操作。在气流中,如果我运行一次,它将回填所有历史日期。如果我再次运行它,它将知道该任务已经运行,并且只运行任何出现的新任务(即最近日期)。
在 Prefect 中,据我所知,它每天都会运行整个管道,即使前一天完成了 99% 的任务。在不切换到 Prefect Cloud 的情况下,有哪些解决方案可以解决这个问题?你只是在退出之前做一些事情,比如让每个任务缓存它在redis中完成的事情吗?
prefect - 如何永久设置本地 Prefect 配置文件?
根据Prefect
配置指南,我通过 设置了一个本地配置文件(即在我当前的工作目录中)export PREFECT__USER_CONFIG_PATH="./config.toml"
,但是,这需要在每个 shell 会话中设置环境变量。
我尝试在 Python 脚本中设置配置,prefect.config.user_config_path = "./config.toml"
但这似乎与加载配置文件的方式不同,export
因为:
(来自文档)“配置文件在 Prefect 首次导入时被解析,并且在 prefect.config 中可作为活动对象使用。”
对于我的虚拟config.toml
数据:
prefect.config
通过环境变量:
prefect.config
通过脚本:
所以config.toml
在第二种情况下不会加载变量
有任何想法吗?
(应用程序:我想在我的本地config.toml
文件中设置一个“完美”秘密,隐藏在源代码管理中)
python - 从 UI 运行时完美 ModuleNotFoundError
我正在关注 Prefect 教程,网址为:https ://docs.prefect.io/core/tutorial/01-etl-before-prefect.html 。代码可以从 git 下载:https ://github.com/PrefectHQ/prefect/tree/master/examples/tutorial
教程依赖于飞机库,它是教程下的目录。我可以通过终端执行流程:
它执行完美!
我创建了一个项目,并将 Flow 添加到该项目中。通过 Prefect Server UI,我可以运行 Flow,但它失败并显示错误消息:
状态消息:无法加载和执行 Flow 的环境:ModuleNotFoundError("No module named 'aircraftlib'")
通过 Prefect Server UI 执行流程时,我应该如何处理依赖关系?
etl - 有没有办法为 Prefect 中的新 Flow 回填历史数据(一次)?
我刚开始阅读有关 Prefect 的文章(并且对使用 Airflow 有一点经验)。
我的目标是设置一个每天在 Prefect 中运行的任务,并将数据收集到一个文件夹中(我想这是 Prefect 可以肯定地帮助我做的事情)。我的目标也是填充历史数据(就像我及时运行这项工作一样)。
在 Airflow 中有这个 start_date 的概念,当它在过去设置时,DAG 将从该日期开始运行并在每个时间间隔填充。
例如,如果我有一个任务需要一个日期并返回该日期的数据,例如:
在 Prefect 中是否有本地方法可以做到这一点?尽管此处提到了回填,但我在此处或文档中的任何地方都找不到此答案。任何帮助/指导都将非常有用。
我尝试了什么:
当我设置schedule
为:
然后我flow.run()
就得到了:
我期望的是从start_date
我提供的开始运行,然后暂停直到它到达当前时间并等待下一个时间表。
python - 完善如何等待外部依赖
我有一个完美的流程,我想在出现特定文件时运行。使用 Luigi 之类的东西,您将创建一个输出该文件的 ExternalTask,然后对其施加依赖。Prefect 中的标准模式是什么?