问题标签 [prefect]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
5 回答
720 浏览

airflow - 每个数据的气流 Dagrun 而不是预定的

我当前面临的问题是我在 MongoDB 集合中有文档,每个文档都需要由需要在非循环依赖图中运行的任务处理和更新。如果上游任务未能处理文档,则没有任何相关任务可以处理该文档,因为该文档尚未使用先决条件信息进行更新。

如果我要使用 Airflow,这给我留下了两个解决方案:

  1. 为每个文档触发一个 DAG,并使用 传递文档 ID --conf。问题在于这不是 Airflow 的预期使用方式。我永远不会运行预定的流程,根据文档在集合中的显示方式,我每天会制作 1440 个 Dagruns。

  2. 每个时期运行一个 DAG 以处理该时期在集合中创建的所有文档。这遵循 Airflow 的预期工作方式,但问题是,如果任务无法处理单个文档,则任何相关任务都无法处理任何其他文档。此外,如果一个文档比其他文档花费更长的时间来处理一个任务,那么那些其他文档正在等待该单个文档继续沿着 DAG 向下。

有比 Airflow 更好的方法吗?还是有比我目前看到的两种方法更好的方法在 Airflow 中处理这个问题?

0 投票
1 回答
797 浏览

python - 将参数化 Prefect 流程与随参数变化的计划相结合

我希望能够重复使用相同的参数化Prefect流程,但时间表因输入而异。例如:

有什么建议么?

0 投票
1 回答
1668 浏览

python - 如何处理完美的任务失败并使用 on_failure 参数返回 SUCCESS?

我有一个Flowin prefectwhich 和 atask的输出是 a dataframe。在下面提供的示例中,它总是失败。我希望task返回一个空dataframe的状态为SUCCESSusing @task(on_failure=handle_task_fail)。实现此目的的正确语法是什么?

追溯:

有些地方我搜索了状态处理程序使用状态处理程序进行记录

0 投票
1 回答
628 浏览

python - How does one update a prefect Flow with duplicate parameters?

Using prefect, I would like to create a new flow from two other flows.

enter image description here enter image description here

The error I am getting is A task with the slug "add_num" already exists in this flow. Is it possible to update Flows that use the same tasks or Parameters. Below is a minimal example of what I am trying to accomplish. `

I did see this piece of code on the slack channel that might tie into solving this problem but I am not sure how to use it.

Thanks in advance.

0 投票
3 回答
2030 浏览

google-cloud-platform - 在 Google Cloud 中运行 Prefect flow serverless 的最佳实践

我已经开始在各种项目中使用 Prefect,现在我需要决定在 GCP 上哪种部署策略最有效。最好我想无服务器工作。比较 Cloud Run、Cloud Functions 和 App Engine,我倾向于选择后者,因为它没有超时限制,而其他两个分别有 9 个。15分钟。

我很想听听人们如何无服务器地部署 Prefect 流,这样流被调度/触发以进行批处理,而代理在不使用时会自动缩小。

或者,更经典的方法是在Compute Engine 上部署 Prefect 并通过 Cloud Scheduler 进行调度。但我觉得这有点过时了,对 Prefect 的功能和未来开发的灵活性不公平。

0 投票
1 回答
396 浏览

prefect - 如何使用完美的“映射”来并行化嵌套循环

我试图弄清楚如何使用完美并行化嵌套循环,这需要在外部扇出/输入内进行内部扇出/输入。

如果我map()在外循环上使用,然后将外循环计算的一些结果也map()用于内循环输入unmapped(),这会产生内扇出吗?那么我该如何做一个“部分减少”以在外循环中使用内循环结果呢?

一个指针或例子将不胜感激。

0 投票
1 回答
81 浏览

python - 如何在完美中为 control_flow 分配名称?

您如何将 a 分配给流程name中的merge任务?

最终目标是merge使用检索flow.get_tasks(name="thing_merger")

提前致谢。

0 投票
1 回答
906 浏览

docker - 如何在 Prefect 流程中使用自定义 docker 存储?

我已经设置了一个Dask集群,我很乐意向Prefect它发送基本流程。现在我想做一些更有趣的事情,并使用我的 python 库获取自定义 docker 映像,并在 dask 集群上执行流/任务。

我的假设是我可以离开 dask 集群(调度程序和工作人员),因为它们使用自己的 python 环境(在检查所有各种消息传递库后到处都有匹配的版本)。也就是说,如果 Flow 在我的 custom 中执行,我不希望将我的库添加到那些机器上storage。但是,要么我没有正确设置存储,要么假设上述情况是不安全的。换句话说,也许在我的自定义库中腌制对象时,Dask 集群确实需要了解我的 python 库。假设我有一些通用的 python 库,叫做data...

谁能解释这种基于 docker 的工作流程如何/是否应该工作?

0 投票
1 回答
1123 浏览

docker - `docker run` 作为 Prefect 任务

我应该在 Prefect 流程中作为任务运行的实际工作负载都打包为 docker 映像。所以一个流程基本上只是“运行这个容器,然后运行那个容器”。

但是,我找不到任何关于如何轻松启动 docker 容器作为任务的示例。基本上,我只需要docker run一个流程。

我知道https://docs.prefect.io/api/latest/tasks/docker.htmlCreateContainer并尝试了and的各种组合StartContainer,但没有任何运气。

0 投票
1 回答
623 浏览

python-3.x - 无法使用不同的参数注册完美的流程

我正在尝试使用不同的参数来实现完美的流程:

但我收到以下错误:

环境:

  • 蟒蛇 3.6.9
  • 完美 0.12.0

你知道是什么导致了这个错误吗?