问题标签 [prefect]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
airflow - 每个数据的气流 Dagrun 而不是预定的
我当前面临的问题是我在 MongoDB 集合中有文档,每个文档都需要由需要在非循环依赖图中运行的任务处理和更新。如果上游任务未能处理文档,则没有任何相关任务可以处理该文档,因为该文档尚未使用先决条件信息进行更新。
如果我要使用 Airflow,这给我留下了两个解决方案:
为每个文档触发一个 DAG,并使用 传递文档 ID
--conf
。问题在于这不是 Airflow 的预期使用方式。我永远不会运行预定的流程,根据文档在集合中的显示方式,我每天会制作 1440 个 Dagruns。每个时期运行一个 DAG 以处理该时期在集合中创建的所有文档。这遵循 Airflow 的预期工作方式,但问题是,如果任务无法处理单个文档,则任何相关任务都无法处理任何其他文档。此外,如果一个文档比其他文档花费更长的时间来处理一个任务,那么那些其他文档正在等待该单个文档继续沿着 DAG 向下。
有比 Airflow 更好的方法吗?还是有比我目前看到的两种方法更好的方法在 Airflow 中处理这个问题?
python - 如何处理完美的任务失败并使用 on_failure 参数返回 SUCCESS?
我有一个Flow
in prefect
which 和 atask
的输出是 a dataframe
。在下面提供的示例中,它总是失败。我希望task
返回一个空dataframe
的状态为SUCCESS
using @task(on_failure=handle_task_fail)
。实现此目的的正确语法是什么?
追溯:
有些地方我搜索了状态处理程序,使用状态处理程序进行记录
python - How does one update a prefect Flow with duplicate parameters?
Using prefect, I would like to create a new flow from two other flows.
The error I am getting is A task with the slug "add_num" already exists in this flow.
Is it possible to update Flows
that use the same tasks
or Parameters
. Below is a minimal example of what I am trying to accomplish.
`
I did see this piece of code on the slack channel that might tie into solving this problem but I am not sure how to use it.
Thanks in advance.
google-cloud-platform - 在 Google Cloud 中运行 Prefect flow serverless 的最佳实践
我已经开始在各种项目中使用 Prefect,现在我需要决定在 GCP 上哪种部署策略最有效。最好我想无服务器工作。比较 Cloud Run、Cloud Functions 和 App Engine,我倾向于选择后者,因为它没有超时限制,而其他两个分别有 9 个。15分钟。
我很想听听人们如何无服务器地部署 Prefect 流,这样流被调度/触发以进行批处理,而代理在不使用时会自动缩小。
或者,更经典的方法是在Compute Engine 上部署 Prefect 并通过 Cloud Scheduler 进行调度。但我觉得这有点过时了,对 Prefect 的功能和未来开发的灵活性不公平。
prefect - 如何使用完美的“映射”来并行化嵌套循环
我试图弄清楚如何使用完美并行化嵌套循环,这需要在外部扇出/输入内进行内部扇出/输入。
如果我map()
在外循环上使用,然后将外循环计算的一些结果也map()
用于内循环输入unmapped()
,这会产生内扇出吗?那么我该如何做一个“部分减少”以在外循环中使用内循环结果呢?
一个指针或例子将不胜感激。
python - 如何在完美中为 control_flow 分配名称?
您如何将 a 分配给流程name
中的merge
任务?
最终目标是merge
使用检索flow.get_tasks(name="thing_merger")
提前致谢。
docker - 如何在 Prefect 流程中使用自定义 docker 存储?
我已经设置了一个Dask
集群,我很乐意向Prefect
它发送基本流程。现在我想做一些更有趣的事情,并使用我的 python 库获取自定义 docker 映像,并在 dask 集群上执行流/任务。
我的假设是我可以离开 dask 集群(调度程序和工作人员),因为它们使用自己的 python 环境(在检查所有各种消息传递库后到处都有匹配的版本)。也就是说,如果 Flow 在我的 custom 中执行,我不希望将我的库添加到那些机器上storage
。但是,要么我没有正确设置存储,要么假设上述情况是不安全的。换句话说,也许在我的自定义库中腌制对象时,Dask 集群确实需要了解我的 python 库。假设我有一些通用的 python 库,叫做data
...
谁能解释这种基于 docker 的工作流程如何/是否应该工作?
docker - `docker run` 作为 Prefect 任务
我应该在 Prefect 流程中作为任务运行的实际工作负载都打包为 docker 映像。所以一个流程基本上只是“运行这个容器,然后运行那个容器”。
但是,我找不到任何关于如何轻松启动 docker 容器作为任务的示例。基本上,我只需要docker run
一个流程。
我知道https://docs.prefect.io/api/latest/tasks/docker.htmlCreateContainer
并尝试了and的各种组合StartContainer
,但没有任何运气。