问题标签 [apache-airflow]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
2100 浏览

airflow - 创建唯一文件名并在所有气流任务中访问该文件

每次气流 dag 运行并从所有任务访问该文件时,我们可以创建唯一的文件名吗?我尝试创建全局变量(output_filename)并为其附加时间戳。但是,当我在任务中访问该文件名时,每个任务都会生成不同的文件名,因为它正在计算每个任务中的时间戳。下面是示例代码:

我们有更多任务需要访问 output_filename。我们如何在所有任务中访问 output_filename 全局变量?

0 投票
1 回答
1189 浏览

docker - 气流:无法将工作日志发送到 S3

我使用 Docker 镜像在我的 kubernetes 集群上部署了 Airflow 网络服务器、调度程序、worker 和 Flower。气流版本是 1.8.0。

现在我想将工作日志发送到 S3 和

  1. 从 Admin UI 创建 Airflow 的 S3 连接(只需设置S3_CONN为 conn id,s3作为类型。因为我的 kubernetes 集群在 AWS 上运行并且所有节点都有 S3 访问角色,应该足够了)
  2. 如下设置气流配置 remote_base_log_folder = s3://aws-logs-xxxxxxxx-us-east-1/k8s-airflow remote_log_conn_id = S3_CONN encrypt_s3_logs = False

首先我尝试创建一个 DAG,以便它在运行后立即引发异常。这行得通,可以在 S3 上看到日志。

所以我进行了修改,以便 DAG 现在创建一个 EMR 集群并等待它准备好(等待状态)。为此,我重新启动了所有 4 个 docker 容器的气流。

现在 DAG 看起来正在运行,集群已启动,一旦准备就绪,DAG 将标记为成功。但我在 S3 上看不到任何日志。

工作人员和 Web 服务器上没有相关的错误日志,所以我什至看不到可能导致此问题的原因。只是没有发送日志。

除了官方文档中的描述外,有谁知道 Airflow 的远程记录是否有一些限制? https://airflow.incubator.apache.org/configuration.html#logs

在 Airflow Web UI 中,本地日志优先于远程日志。如果无法找到或访问本地日志,则会显示远程日志。请注意,仅在任务完成(包括失败)后才会将日志发送到远程存储。换句话说,运行任务的远程日志不可用。

我没想到,但成功后,日志不会发送到远程存储吗?

0 投票
1 回答
1505 浏览

workflow - 如何在特定目录上手动运行 Airflow DAG

我正在评估 Airflow 是否适合我的需求(在生物信息学方面)。我在使用 Airflow 模型时遇到了一些困难。具体来说:

  • DAG 文件实际在哪里执行?它的背景是什么?如何将输入数据传递到 DAG 定义文件中?(例如,我想为目录中的每个文件创建一个任务。)
  • 如何临时执行 DAG?如何为 DAG 构造传递参数?

这是我想要执行的示例。假设我刚刚收到一些数据作为目录,其中包含某些共享文件系统中可用的 20 个文件。我想执行一个 DAG 管道,它对 20 个文件中的每一个文件运行一个特定的 bash 命令,然后组合一些结果并执行进一步的处理。DAG 需要文件系统上的路径,还需要列出目录中的文件,以便为每个文件构建一个任务。

XCom只要我可以预先动态构建整个 DAG,我可能不需要将元数据从一个任务传递到另一个任务(我理解这是可能的)。但我不清楚如何通过 DAG 构建路径。

换句话说,我希望我的 DAG 定义包括类似

input_path当我想手动触发 DAG 时如何传入?

我也不需要 cron 样式的调度。

0 投票
2 回答
77 浏览

cron - 不同日期不同时间的 Cron 选项卡调度

我需要在周一至周四晚上 7 点安排工作,周五我需要在晚上 11 点安排工作。我正在使用 Airflow 并且需要 cron 制表符,例如

0 19 * * 周一至周四

欢迎任何建议。

谢谢

问候, CJ

0 投票
1 回答
5924 浏览

airflow - 每月日期和时间的气流 DAG 调度

我们一直在将我们的 cron 作业转换为 Airflow DAG,我很难弄清楚 DAG 的调度在 Airflow 中是如何工作的。一些 DAG 需要在一天中的特定时间(即早上 7 点)运行,而其他 DAG 需要在每月的特定日期/时间(即每月 15 日早上 6 点)运行。

一般来说,Airflow 似乎每天都在正确运行 DAG。因此,schedule_interval = '0 7 * * *每天'start_date': datetime(2017,4,7)早上 7 点运行。

但是,对于每月 DAG (schedule_interval = '0 6 15 * *''start_date': datetime(2017,4,7)),它在 4 月 15 日早上 6 点运行,但从那时起就没有运行过。我尝试每月安排的其他 DAG 在第一个月后同样无法运行。

Airflow关于调度的文档是,IMO,浑浊,对其他SO问题的回答让我更加困惑。我希望有人能澄清我的理解和我试图每月安排的 DAG 出了什么问题。

0 投票
2 回答
1357 浏览

airflow - 气流 - 是否可以使用 backfill 命令一次(按顺序)运行一天?

基本上,我想运行 backfill 命令整整一个月。但是有一些任务依赖于前一天的数据。据我所知,这个命令每天都在同一时间运行。

有没有办法让回填命令一次运行一天(按顺序)?

干杯。

0 投票
6 回答
12364 浏览

airflow - 气流以编程方式取消暂停dag?

我有一个 dag,我们将部署到多个不同的气流实例,并且在 airflow.cfg 中我们有dags_are_paused_at_creation = True,但是对于这个特定的 dag,我们希望无需通过单击 UI 手动打开它。有没有办法以编程方式做到这一点?

0 投票
2 回答
15627 浏览

airflow - 来自cli的气流传递参数

有没有办法将参数传递给:

?

我有一个监视文件目录的脚本 - 当文件移动到目标目录时,我想触发作为参数传递文件路径的 dag。

0 投票
4 回答
11207 浏览

python - 气流:将动态值传递给子 DAG 运算符

我是气流新手。
我遇到了一个场景,其中父 DAG 需要将一些动态数字(比如说n)传递给子 DAG。
SubDAG 将使用此数字来动态创建n并行任务。

气流文档没有涵盖实现这一目标的方法。所以我探索了几种方法:

选项 - 1(使用 xcom 拉取)

我试图作为 xcom 值传递,但由于某种原因,SubDAG 没有解析为传递的值。

父 Dag 文件

子日期文件

选项 - 2

我也尝试将number_of_runs其作为全局变量传递,但它不起作用。

选项 - 3

我们还尝试将此值写入数据文件。但子 DAG 正在抛出File doesn't exist error。这可能是因为我们正在动态生成这个文件。

有人可以帮我弄这个吗。

0 投票
2 回答
17531 浏览

airflow - Dags和任务的气流结构/组织

我的问题: