问题标签 [apache-airflow]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
bash - 如何设置 Airflow 的电子邮件配置以发送有关错误的电子邮件?
我试图通过传入thisshouldnotrun
不起作用的 Bash 行 ( ) 来故意使 Airflow 任务失败并出错。气流正在输出以下内容:
Airflow 会针对此类错误发送电子邮件吗?如果没有,为这些错误发送电子邮件的最佳方式是什么?
我什至不确定是否airflow.cfg
设置正确......由于最终目标是测试电子邮件警报通知,我想确保正确设置airflow.cfg。这是设置:
是什么smtp_starttls
?我在文档或在线中找不到任何信息。如果我们需要 2 因素身份验证来查看电子邮件,这对 Airflow 来说会是个问题吗?
这是我的 Bash 命令:
我的任务:
bash - 如何使用 Bash 引发错误?
我试图在包含在 Python Airflow 脚本中的块中引发 Bash 错误。
基本上,调用该snowsql
函数的最后一行将始终返回退出状态 0,即使我删除了重定向片段。所以这就是我重定向到那个error_dev.log 文件的原因。
基本上,我想添加一个if
块,如果文件存在,则在其中手动引发错误。我试过添加exit 64
,但 Airflow 似乎不喜欢这样。是否使用exit ##
最容易在 Bash 中引发错误?不知道 Airflow 甚至期望非常诚实。
slack - 气流失败松弛消息
如何配置 Airflow,以便 DAG 中的任何故障都会(立即)导致松弛消息?
此时我通过创建一个 slack_failed_task 来管理它:
并将此任务 (one_failed) 设置为 DAG 中其他任务的上游:
它可以工作,但很容易出错,因为忘记添加任务会跳过松弛通知,看起来工作量很大。
有没有办法扩展email_on_failure
DAG 中的属性?
奖励 ;-) 包括将失败任务的名称传递给消息的方法。
airflow - 如何在 Airflow 中定义具有用户交互的有状态 DAG
我刚刚第一次启动 Airflow 并试图弄清楚它是否允许运行带有过滤器的“管道”,其中过滤器可以是 shell 任务或微服务。
考虑以下场景:
- 考虑一个有注册用户的门户网站
- 此门户的用户能够启动工作流“W1”和“W2”。这也意味着工作流需要在用户之间隔离运行,并且需要状态持久性。
- 让我们假设工作流程 W1 是完全自动化的(我不会详细说明)。所以这是一项正常的任务,可以开始并在某个时候完成。
然而,工作流 W2 在工作流中的某些点需要用户交互。假设工作流 W2 包括三个步骤:
1) 预处理数据
2) 通过 REST API 公开数据,并允许(拥有工作流的用户)更改数据。这可能是一个循环,因此用户可能会多次更改数据,直到工作流继续到下一步。
3) 后处理数据
到目前为止,我有三个无法通过文档或示例解决的问题:
Q1:如何在 Airflow 中定义一个 DAG,以便每个用户单独启动此类工作流 w2?
Q2:如何在循环等待用户交互时“暂停并继续”执行?
Q3:如何持久化工作流,让它永远不会超时,可以随时继续?
欢迎使用小代码示例(我也是 Python 新手)!
airflow - 运行时添加到 DAG 的任务调度失败
我的想法是foo
创建一个生成输入列表(用户、报告、日志文件等)的任务,并为输入列表中的每个元素启动一个任务。目标是利用 Airflow 的重试和其他逻辑,而不是重新实现它。
这里唯一的变量是生成的任务数。在所有这些都完成后,我想再做一些任务,因此为每项任务设置一个新的 DAG 似乎并不合适。
这是我的代码:
在日志中,我可以看到gen_nodes
正确执行(即Iterating worker 5
等)。但是,新任务没有被安排,也没有证据表明它们已被执行。
我在网上找到了相关的代码示例,例如 this,但无法使其工作。我错过了什么吗?
或者,是否有更合适的方法来解决这个问题(隔离工作单元)?
airflow - 运行以安排 DAG 的 Airflow 命令的正确顺序是什么?
我不明白需要运行哪些命令才能安排 DAG。假设我使用 DAG 测试了 DAG airflow test dag_name task_id_1 2017-06-22
,第二个任务使用airflow test dag_name task_id_2 2017-06-22
.
我跑了airflow trigger_dag dag_name
,但那是为了在那一刻实例化 DAG 吗?
假设我希望dag_name
's 时间/调度看起来像:
所以我只想在今天和明天安排并运行它,从今天 18:00 UTC 开始,之后 24 小时。
现在我应该运行什么命令或命令列表?airflow scheduler
每次我想添加和安排 DAG 时都必须运行吗?
linux - 如何停止/终止以守护程序模式启动的气流调度程序
我和气流新手,不小心在守护程序模式下启动了气流调度程序。现在,我想杀死调度程序并可能重新启动它。我试着做
什么都没有发生。当我跑
我看到这些条目:
...依此类推,35 行具有不同的 pid。
关于如何在不重新启动机器的情况下停止/终止气流调度程序的任何建议。我还检查了调度程序的 pid 文件并尝试杀死该 pid 但没有效果。
任何帮助表示赞赏。谢谢!
python-2.7 - 由于上游任务失败,气流无法运行 DAG
我正在尝试使用 Apache Airflow 创建工作流。所以基本上我已经在服务器中我自己的 anaconda 内核中手动安装了 Airflow。
这是我运行简单 DAG 的方式
当我使用气流测试做同样的事情时,它独立地完成了特定的任务。例如在 dag1 中:task1 >> task2
我想它会先运行task1,然后运行task2。但它只是独立运行task2。
你们对此有什么想法吗?非常感谢您!
这是我的代码:
气流:v1.8.0 使用带有 SQLLite 的执行器 SequentialExecutor
这是错误消息:
python - 气流中的辅助功能
气流中的辅助功能应该放在哪里?它应该添加到插件目录中吗?我将为我的 ETL 作业编写一些可重复的代码,如下所示:
问题是我会遇到如下错误:
airflow - 如何在 Airflow dag 中途成功退出任务?
我有一个检查 FTP 服务器上的文件的 dag(气流在单独的服务器上运行)。如果文件存在,则将文件移动到 S3(我们在此处存档)。从那里,文件名被传递给 Spark 提交作业。spark 作业将通过 S3(不同服务器上的 spark 集群)处理文件。我不确定我是否需要有多个 dag,但这是流程。我要做的是仅在 S3 存储桶中存在文件时才运行 Spark 作业。
我尝试使用 S3 传感器,但在满足超时标准后失败/超时,因此整个 dag 设置为失败。
我只想在一个或多个文件移动到 S3 时执行 FTP 检查的脚本之后运行所有内容。