问题标签 [airflow-2.x]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
393 浏览

postgresql - How to pass the result from first query to the second one with PostgresOperator in airflow 2.x?

I have to pass the result of my first redshift query to the second one. I am using postgres operator, Postgre script. doesn't have any return function as you see in this link

Actually I thought to modify the script and add return to the execute method. But the point is that I do not use the execute method and for executing the sql script I am using this:

Here are my two queries:

and with the retrieve data (which is a list) , I have to pass it to the second script:

I thought that using xcom could be a good solution, as I don't return many rows. But I don't know how to use it with Postgres.

I don't want to use the temporal table, as I believe that for that small volume I don't need.

I ll appreciate your help.

0 投票
1 回答
242 浏览

airflow - Airflow 2.0 问题:气流监控任务过多

我使用 docker swarm 和 Celery Executor 安装了气流 2.0。
1 周后,芹菜工人的内存溢出airflow task supervisor(附截图)
有人遇到过这样的问题吗?有什么建议么 ?

在此处输入图像描述

0 投票
1 回答
842 浏览

airflow - Airflow 2.0.2 如何使用 xcom 在 postgres 任务中传递参数?

我正在尝试以动态方式传递 postgres 运算符中的参数。

为了刷新元数据,有两个任务,

  1. 获取 id 列表 (get_query_id_task)

  2. 传递 id 列表以获取并执行查询 (get_query_text_task)

Xcom push 返回查询列表如下:

我使用插件来渲染 xcom 推送:

第一个任务的模板渲染: 在此处输入图像描述

呈现的模板未传递参数 在此处输入图像描述

Xcom 推送值,是元组列表 在此处输入图像描述

使用下面提供的解决方案已解决了该问题。但是我无法在 id 列表上进行循环。所以它只是向我显示一个ID。而且我不确定如何循环遍历 id。

这是日志:

0 投票
2 回答
101 浏览

airflow - Airflow 中的任务之间的延迟或任何其他选项?

我们正在使用气流 2.00。我正在尝试实现一个做两件事的 DAG:

  1. 通过 API 触发报告
  2. 将报告从源下载到目标。

任务 1 和 2 之间至少需要 2-3 小时的间隔。根据我的研究,我有两个选择

  1. 两个任务的两个 DAG。安排与第一个 DAG 相隔两小时的第二个 DAG
  2. 此处提到的两个任务之间的延迟

这两个选项之间是否存在偏好。Airflow 2.0 有第三种选择吗?请指教。

0 投票
1 回答
766 浏览

python - 从 BashOperator 到 SSHOperator 的 Airflow XCOM 通信

刚开始学习Airflow,但是对Xcom的概念掌握起来还是比较困难的。因此我写了一个这样的dag:

第一个任务运行成功,但是我无法从 task 获取 XCom return_value Read_my_IP,这是本地机器的 IP 地址。这可能是非常基本的,但文档没有提到如何声明task_instance.

请帮助完成 Xcom 流程并将 IP 地址从本地计算机传递到远程计算机以进行进一步的过程。

0 投票
1 回答
898 浏览

python - Airflow 中外部连接的连接池

我正在尝试为在 Airflow 中创建的外部连接找到一种连接池管理方法。
气流版本:2.1.0
Python 版本:3.9.5
气流数据库:SQLite
创建外部连接:MySQL 和雪花

我知道airflow.cfg 文件中有属性

但是这些属性用于管理气流内部数据库,在我的例子中是 SQLite。

我几乎没有在 MySQL 和 Snowflake 中读取或写入数据的任务。

从 MySQL 读取数据

我观察到的是,正在为 Snowflake 的每个任务(同一个 dag 中有多个任务)创建一个新会话,但尚未验证 MySQL 的相同。

有没有办法维护外部连接的连接池(在我的情况下是雪花和 MySQL)或任何其他方式在同一会话中运行同一 DAG 中的所有查询?

谢谢

0 投票
1 回答
375 浏览

java - 从 Airflow 调用 Java 类或方法

我想从 Airflow 任务中调用几个 java 方法,因为我已经有很多用 java 编写的代码,包括验证、清理、一些业务逻辑,我不想再次用 Python 重写它。

我正在使用气流 2.1.0

我知道有一种方法可以使用 BashOperator 调用 jar

但是有什么方法可以让我直接从任务本身调用任何特定的java类或方法,否则我必须为我的每个逻辑创建许多小罐子。
提前致谢。

0 投票
1 回答
299 浏览

airflow - Airflow CLI:如何在 Airflow 1.10.12 中获取 dag 任务的状态?

在 Airflow 2.0 中,您可以通过运行 CLI 命令获取 dag 中的任务状态:airflow tasks states-for-dag-run. (请参阅此处的文档:https ://airflow.apache.org/docs/apache-airflow/stable/cli-and-env-variables-ref.html#state_repeat1 )

Airflow 1.10.12 中的等价物是什么?我似乎在1.10.12 文档中找不到它。

0 投票
1 回答
848 浏览

kubernetes - Kubernetes Pod 因 CrashLoopBackoff 而失败,即使 Airflow 2.0 中的退出代码为 0

我正在将 Airflow 从版本 1.10 升级到 2.1.0。我的项目用于KubernetesPodOperatorKubernetesExecutor. 在 Airflow 1.10 中一切正常。但是当我升级 Airflow 2.1.0 时,Pod 能够运行任务,并且在成功完成后,它会以CrashLoopBackoff状态重新启动。我已经检查过了livenessProbe,它按预期工作。我检查了其他日志,但在指定的任何容器或 pod 中都找不到任何问题。

部署.yaml 文件:

描述吊舱:

Worker pod 列表和日志

0 投票
1 回答
106 浏览

python - Airflow dag 级别重试值会覆盖任务级别重试值吗?

我注意到一个无主机错误与在任务文件夹中找不到日志相结合。我的任务的重试次数设置为 3。此外,dag 的重试次数设置为 1。在 dag 级别设置的重试值是否会覆盖任务重试?我注意到它在失败后没有重试。