问题标签 [airflow-2.x]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
postgresql - How to pass the result from first query to the second one with PostgresOperator in airflow 2.x?
I have to pass the result of my first redshift query to the second one. I am using postgres operator, Postgre script. doesn't have any return function as you see in this link
Actually I thought to modify the script and add return to the execute method. But the point is that I do not use the execute method and for executing the sql script I am using this:
Here are my two queries:
and with the retrieve data (which is a list) , I have to pass it to the second script:
I thought that using xcom could be a good solution, as I don't return many rows. But I don't know how to use it with Postgres.
I don't want to use the temporal table, as I believe that for that small volume I don't need.
I ll appreciate your help.
airflow - Airflow 中的任务之间的延迟或任何其他选项?
我们正在使用气流 2.00。我正在尝试实现一个做两件事的 DAG:
- 通过 API 触发报告
- 将报告从源下载到目标。
任务 1 和 2 之间至少需要 2-3 小时的间隔。根据我的研究,我有两个选择
- 两个任务的两个 DAG。安排与第一个 DAG 相隔两小时的第二个 DAG
- 此处提到的两个任务之间的延迟
这两个选项之间是否存在偏好。Airflow 2.0 有第三种选择吗?请指教。
python - 从 BashOperator 到 SSHOperator 的 Airflow XCOM 通信
刚开始学习Airflow,但是对Xcom的概念掌握起来还是比较困难的。因此我写了一个这样的dag:
第一个任务运行成功,但是我无法从 task 获取 XCom return_value Read_my_IP
,这是本地机器的 IP 地址。这可能是非常基本的,但文档没有提到如何声明task_instance
.
请帮助完成 Xcom 流程并将 IP 地址从本地计算机传递到远程计算机以进行进一步的过程。
python - Airflow 中外部连接的连接池
我正在尝试为在 Airflow 中创建的外部连接找到一种连接池管理方法。
气流版本:2.1.0
Python 版本:3.9.5
气流数据库:SQLite
创建外部连接:MySQL 和雪花
我知道airflow.cfg 文件中有属性
但是这些属性用于管理气流内部数据库,在我的例子中是 SQLite。
我几乎没有在 MySQL 和 Snowflake 中读取或写入数据的任务。
和
从 MySQL 读取数据
我观察到的是,正在为 Snowflake 的每个任务(同一个 dag 中有多个任务)创建一个新会话,但尚未验证 MySQL 的相同。
有没有办法维护外部连接的连接池(在我的情况下是雪花和 MySQL)或任何其他方式在同一会话中运行同一 DAG 中的所有查询?
谢谢
java - 从 Airflow 调用 Java 类或方法
我想从 Airflow 任务中调用几个 java 方法,因为我已经有很多用 java 编写的代码,包括验证、清理、一些业务逻辑,我不想再次用 Python 重写它。
我正在使用气流 2.1.0
我知道有一种方法可以使用 BashOperator 调用 jar
但是有什么方法可以让我直接从任务本身调用任何特定的java类或方法,否则我必须为我的每个逻辑创建许多小罐子。
提前致谢。
airflow - Airflow CLI:如何在 Airflow 1.10.12 中获取 dag 任务的状态?
在 Airflow 2.0 中,您可以通过运行 CLI 命令获取 dag 中的任务状态:airflow tasks states-for-dag-run
. (请参阅此处的文档:https ://airflow.apache.org/docs/apache-airflow/stable/cli-and-env-variables-ref.html#state_repeat1 )
Airflow 1.10.12 中的等价物是什么?我似乎在1.10.12 文档中找不到它。
kubernetes - Kubernetes Pod 因 CrashLoopBackoff 而失败,即使 Airflow 2.0 中的退出代码为 0
我正在将 Airflow 从版本 1.10 升级到 2.1.0。我的项目用于KubernetesPodOperator
在KubernetesExecutor
. 在 Airflow 1.10 中一切正常。但是当我升级 Airflow 2.1.0 时,Pod 能够运行任务,并且在成功完成后,它会以CrashLoopBackoff
状态重新启动。我已经检查过了livenessProbe
,它按预期工作。我检查了其他日志,但在指定的任何容器或 pod 中都找不到任何问题。
部署.yaml 文件:
描述吊舱:
python - Airflow dag 级别重试值会覆盖任务级别重试值吗?
我注意到一个无主机错误与在任务文件夹中找不到日志相结合。我的任务的重试次数设置为 3。此外,dag 的重试次数设置为 1。在 dag 级别设置的重试值是否会覆盖任务重试?我注意到它在失败后没有重试。