问题标签 [airflow-2.x]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
46 浏览

airflow - 如何在气流 dag 中的两种不同模式之间切换?

我正在为看起来很简单的事情而苦苦挣扎,但我不知道该怎么做。我们的 PRE 环境中有 2 个不同的模式。如何在气流中的 2 个模式之间切换?想象一下,我只有一个 dag 。bash 中的相同场景是手动执行带有 env 参数(mstr_new、mstr_pre)的 .sh。

现在要在 Airflow 中实现相同的功能,我们将如何指定必须执行的模式?

在此处输入图像描述

0 投票
1 回答
392 浏览

airflow - KubernetesExecutor 的气流:记录的 pid 与当前的 pid.airflow 不匹配。获取 AirflowException:任务收到 SIGTERM 信号

我正在使用 Kubernetes:1.21.0 在 KubernetesExecutor 上运行 Airflow:2.1.4

获取 AirflowException:任务收到 SIGTERM 信号

堆栈跟踪:

当任务接收到heartbeat_callback(根据airflow配置中的job_heartbeat_sec),记录的pid和当前的pid不匹配,因为这部分代码被执行了。https://github.com/apache/airflow/blob/main/airflow/jobs/local_task_job.py#L201-L203

我尝试通过更改源代码在此处打印 run_as_user 的值:

ti.run_as_user = 无 self.task_runner.run_as_user = 气流

我曾尝试更改 job_heartbeat_sec 值,但该任务始终失败。我正在运行调度程序,网络服务器作为气流用户。DAG 也由气流用户触发。我尝试将 DAG 定义中的 run_as_user 设置为 None 或气流,但同样的错误。

从 Docker 文件创建 Airflow 用户:

气流配置

0 投票
1 回答
36 浏览

google-cloud-platform - Google Cloud Airflow Composer 产生磁盘成本“WHY”还是“FROM WHERE”?

我们出现了问题。当我们在谷歌云平台上创建气流作曲家时。磁盘价格出现在计费报告中。但我们不知道“为什么我们有成本或从哪个工具产生这个成本?”

如果您对此问题有任何想法,请发表评论。

0 投票
0 回答
492 浏览

airflow - Airflow2 gitSync DAG 适用于气流命名空间,但不适用于备用命名空间

我正在运行 minikube 以使用 Apache Airflow2 进行开发。我正在尝试从 GitLab 上的私人仓库同步我的 DAG,但为了让一个基本示例正常工作,我后退了几步。在默认的“airflow”命名空间的情况下,它可以工作,但是当在非默认命名空间中使用完全相同的文件时,它不会。

我有一个 values.yaml 文件,其中包含以下部分:

如果我运行helm upgrade --install airflow apache-airflow/airflow -f values.yaml -n airflow, 然后kubectl port-forward svc/airflow-webserver 8080:8080 --namespace airflow,我会按预期在 http://localhost:8080 处获得完整的 DAG 列表。

但是,如果我运行helm upgrade --install airflow apache-airflow/airflow -f values.yaml -n mynamespace, 然后kubectl port-forward svc/airflow-webserver 8080:8080 --namespace mynamespace,我在 http://localhost:8080 中没有列出任何 DAG。

如果我列出我试图解决这个问题的所有网站,这篇文章会长 10 倍。我做错了什么??

更新:我创建了一个新的命名空间,,test01以防有一些历史被搁置并导致问题。我跑了helm upgrade --install airflow apache-airflow/airflow -f values.yaml -n test01。启动网络服务器并检查,我没有看到登录屏幕,但它直接进入通常的网页,也没有显示 dags 列表,但这次在 DAG 页面顶部有一个通知:

调度程序似乎没有运行。
DAG 列表可能不会更新,并且不会安排新任务。

这又是不同的行为(尽管与mynamespace通过 gitSync 显示没有 DAG 的情况相同),尽管它似乎暗示了在这种情况下没有检索 DAG 的原因。如果一切都像以前一样启动并启动,我不明白为什么调度程序没有运行。

奇怪的是,helm show values apache-airflow/airflow --namespace test01 > values2.yaml给出了默认值dags.gitSync.enabled: falsedags.gitSync.repo: https://github.com/apache/airflow.git. 我原以为这应该反映我从 values.yaml 升级/安装的内容:enable = true 和ssh repo fetch。dags.gitSync.enabled: true通过将 values2.yaml 编辑为并重新升级,我的行为没有改变——仍然是关于调度程序没有运行和没有 DAG 的错误说明。

0 投票
1 回答
187 浏览

airflow - 遇到超时,如何使气流传感器成功?

参考这个,气流传感器允许我们在运行下一个任务之前检查一个标准。鉴于用户设置了超时和另一个标志,有没有办法标记成功终止传感器?

在我的用例中,我必须通过传感器检查条件,但只能在我希望 DAG / followings 任务正常运行的特定时间范围内。

0 投票
1 回答
389 浏览

failed-installation - Airflow 2.2.0 安装错误:包版本有冲突的依赖关系 cattrs

尝试按照https://airflow.apache.org/docs/apache-airflow/stable/installation/installing-from-pypi.html上的安装步骤操作时出现以下错误

0 投票
1 回答
39 浏览

airflow - 多个任务作为一个组的气流重试

我有一组任务应该作为一个单元运行,从某种意义上说,如果组中的任何任务失败,整个组都应该被标记为失败。

我希望能够在失败时重试该组。

例如,我有一个包含以下任务的 DAG:

我想说那(taskB >> taskC)是一个群体。

如果要么taskB失败taskC,我希望能够重新运行整个组(taskB >> taskC)

0 投票
0 回答
11 浏览

airflow-2.x - 气流:不要为某些错误发送电子邮件

我有一个从 ftp 服务器检查和下载文件的任务。如果找不到文件,它会抛出一个断言并稍后重试。我希望气流不向我发送此特定案例的电子邮件,但仍向我发送有关此 DAG 上的任何其他故障和重试的电子邮件。这可能吗?

如果这有所作为,我正在使用 Composer 1.17.1 (airflow 2.1.2)。

0 投票
2 回答
408 浏览

airflow - 如何在 Airflow MySQL Operator 中使用 jinja 模板

我目前正在 Airflow 的 MySQLOperator 中运行此查询。如何使用 Jinja 模板将区域、s3 存储桶替换为参数?

  • 气流版本:2.0.2
  • 蟒蛇:3.7
0 投票
2 回答
460 浏览

authentication - 通过 API 触发 Airflow DAG

我已经在 EC2 上安装了 Airflow 2.0.1,并使用 PostgreSQL RDS 作为元数据数据库。我想从 Lambda 触发 DAG,因此尝试使用 curl 测试代码,但收到 Unauthorized 作为响应。如果我应该做不同的事情怎么办?

脚步:

  1. 为 lambda 创建用户
  1. 在 shell 上定义变量(用于 lambda 用户、密码和端点 url)
  2. 拨打 curl 电话

我收到的回复是这样的: