问题标签 [apache-airflow]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
3 回答
41692 浏览

bash - 如何设置 Airflow 的电子邮件配置以发送有关错误的电子邮件?

我试图通过传入thisshouldnotrun不起作用的 Bash 行 ( ) 来故意使 Airflow 任务失败并出错。气流正在输出以下内容:

Airflow 会针对此类错误发送电子邮件吗?如果没有,为这些错误发送电子邮件的最佳方式是什么?

我什至不确定是否airflow.cfg设置正确......由于最终目标是测试电子邮件警报通知,我想确保正确设置airflow.cfg。这是设置:

是什么smtp_starttls?我在文档或在线中找不到任何信息。如果我们需要 2 因素身份验证来查看电子邮件,这对 Airflow 来说会是个问题吗?

这是我的 Bash 命令:

我的任务:

0 投票
1 回答
326 浏览

bash - 如何使用 Bash 引发错误?

我试图在包含在 Python Airflow 脚本中的块中引发 Bash 错误。

基本上,调用该snowsql函数的最后一行将始终返回退出状态 0,即使我删除了重定向片段。所以这就是我重定向到那个error_dev.log 文件的原因。

基本上,我想添加一个if块,如果文件存在,则在其中手动引发错误。我试过添加exit 64,但 Airflow 似乎不喜欢这样。是否使用exit ##最容易在 Bash 中引发错误?不知道 Airflow 甚至期望非常诚实。

0 投票
5 回答
10882 浏览

slack - 气流失败松弛消息

如何配置 Airflow,以便 DAG 中的任何故障都会(立即)导致松弛消息?

此时我通过创建一个 slack_failed_task 来管理它:

并将此任务 (one_failed) 设置为 DAG 中其他任务的上游:

它可以工作,但很容易出错,因为忘记添加任务会跳过松弛通知,看起来工作量很大。

有没有办法扩展email_on_failureDAG 中的属性?

奖励 ;-) 包括将失败任务的名称传递给消息的方法。

0 投票
0 回答
471 浏览

airflow - 如何在 Airflow 中定义具有用户交互的有状态 DAG

我刚刚第一次启动 Airflow 并试图弄清楚它是否允许运行带有过滤器的“管道”,其中过滤器可以是 shell 任务或微服务。

考虑以下场景:

  • 考虑一个有注册用户的门户网站
  • 此门户的用户能够启动工作流“W1”和“W2”。这也意味着工作流需要在用户之间隔离运行,并且需要状态持久性。
  • 让我们假设工作流程 W1 是完全自动化的(我不会详细说明)。所以这是一项正常的任务,可以开始并在某个时候完成。
  • 然而,工作流 W2 在工作流中的某些点需要用户交互。假设工作流 W2 包括三个步骤:

    1) 预处理数据

    2) 通过 REST API 公开数据,并允许(拥有工作流的用户)更改数据。这可能是一个循环,因此用户可能会多次更改数据,直到工作流继续到下一步。

    3) 后处理数据

到目前为止,我有三个无法通过文档或示例解决的问题:

Q1:如何在 Airflow 中定义一个 DAG,以便每个用户单独启动此类工作流 w2?

Q2:如何在循环等待用户交互时“暂停并继续”执行?

Q3:如何持久化工作流,让它永远不会超时,可以随时继续?

欢迎使用小代码示例(我也是 Python 新手)!

0 投票
1 回答
1917 浏览

airflow - 运行时添加到 DAG 的任务调度失败

我的想法是foo创建一个生成输入列表(用户、报告、日志文件等)的任务,并为输入列表中的每个元素启动一个任务。目标是利用 Airflow 的重试和其他逻辑,而不是重新实现它。

所以,理想情况下,我的 DAG 应该是这样的: 在此处输入图像描述

这里唯一的变量是生成的任务数。在所有这些都完成后,我想再做一些任务,因此为每项任务设置一个新的 DAG 似乎并不合适。

这是我的代码:

在日志中,我可以看到gen_nodes正确执行(即Iterating worker 5等)。但是,新任务没有被安排,也没有证据表明它们已被执行。

我在网上找到了相关的代码示例,例如 this,但无法使其工作。我错过了什么吗?

或者,是否有更合适的方法来解决这个问题(隔离工作单元)?

0 投票
1 回答
400 浏览

airflow - 运行以安排 DAG 的 Airflow 命令的正确顺序是什么?

我不明白需要运行哪些命令才能安排 DAG。假设我使用 DAG 测试了 DAG airflow test dag_name task_id_1 2017-06-22,第二个任务使用airflow test dag_name task_id_2 2017-06-22.

我跑了airflow trigger_dag dag_name,但那是为了在那一刻实例化 DAG 吗?

假设我希望dag_name's 时间/调度看起来像:

所以我只想在今天和明天安排并运行它,从今天 18:00 UTC 开始,之后 24 小时。

现在我应该运行什么命令或命令列表?airflow scheduler每次我想添加和安排 DAG 时都必须运行吗?

0 投票
5 回答
26308 浏览

linux - 如何停止/终止以守护程序模式启动的气流调度程序

我和气流新手,不小心在守护程序模式下启动了气流调度程序。现在,我想杀死调度程序并可能重新启动它。我试着做

什么都没有发生。当我跑

我看到这些条目:

...依此类推,35 行具有不同的 pid。

关于如何在不重新启动机器的情况下停止/终止气流调度程序的任何建议。我还检查了调度程序的 pid 文件并尝试杀死该 pid 但没有效果。

任何帮助表示赞赏。谢谢!

0 投票
2 回答
6492 浏览

python-2.7 - 由于上游任务失败,气流无法运行 DAG

我正在尝试使用 Apache Airflow 创建工作流。所以基本上我已经在服务器中我自己的 anaconda 内核中手动安装了 Airflow。

这是我运行简单 DAG 的方式

当我使用气流测试做同样的事情时,它独立地完成了特定的任务。例如在 dag1 中:task1 >> task2

我想它会先运行task1,然后运行task2。但它只是独立运行task2。

你们对此有什么想法吗?非常感谢您!

这是我的代码:

气流:v1.8.0 使用带有 SQLLite 的执行器 SequentialExecutor

这是错误消息:

0 投票
1 回答
1735 浏览

python - 气流中的辅助功能

气流中的辅助功能应该放在哪里?它应该添加到插件目录中吗?我将为我的 ETL 作业编写一些可重复的代码,如下所示:

问题是我会遇到如下错误:

0 投票
2 回答
1809 浏览

airflow - 如何在 Airflow dag 中途成功退出任务?

我有一个检查 FTP 服务器上的文件的 dag(气流在单独的服务器上运行)。如果文件存在,则将文件移动到 S3(我们在此处存档)。从那里,文件名被传递给 Spark 提交作业。spark 作业将通过 S3(不同服务器上的 spark 集群)处理文件。我不确定我是否需要有多个 dag,但这是流程。我要做的是仅在 S3 存储桶中存在文件时才运行 Spark 作业。

我尝试使用 S3 传感器,但在满足超时标准后失败/超时,因此整个 dag 设置为失败。

我只想在一个或多个文件移动到 S3 时执行 FTP 检查的脚本之后运行所有内容。