问题标签 [airflow-2.x]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
535 浏览

redis - Airflow 2.0 - 即使池插槽可用,任务仍处于排队状态

蟒蛇依赖:

  • 阿帕奇气流==2.1.3
  • redis==3.5.3
  • 芹菜==4.4.7

气流配置:

  • core.parallelism = 1000
  • core.dag_concurrency = 400
  • core.max_active_runs_per_dag = 1
  • celery.worker_concurrency = 6
  • celery.broker_url = redis
  • celery.result_backend = mysql

升级到气流 2.0 后面临这个问题

并非所有任务都卡在“排队”状态,一次只有 1 个任务卡住(超过 100 个)。

有时任务也停留在“运行”状态。

此问题在 1-2 天内发生一次。

即使池槽可用(槽 = 128,运行槽 = 0,排队槽 = 1)也会发生这种情况

根据工作人员日志(对于卡住的任务和正常任务):

我在下面看到所有正常工作任务的日志,这对于卡住的任务是缺失的:

0 投票
1 回答
111 浏览

apache-spark - 我可以使用 Airflow 启动/停止火花流作业吗

我有两种类型的工作:Spark Ba​​tch 作业和 Spark 流式作业。

我想用气流来安排和管理它们。

用于作业的气流已停止。但我想将它用于我的流媒体工作。谁能给我一些想法或其他工具来完成我的流媒体工作

0 投票
0 回答
86 浏览

airflow - 如何最好地将“ftp to s3 operator”合并到现有的 Airflow DAG 中?希望使用airflow.providers.amazon.aws.transfers.ftp_to_s3

我对 Airflow 比较陌生,正在寻求将 #following #airflow.providers.amazon.aws.transfers.ftp_to_s3 引入我的 DAG 的帮助。#参数(s3_bucket、aws_conn_id、ftp_conn_id 等)如何传递给这个操作符?这些参数会存在于引用的 python 脚本中吗?

https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/_api/airflow/providers/amazon/aws/transfers/ftp_to_s3/index.html

https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/_modules/airflow/providers/amazon/aws/transfers/ftp_to_s3.html

0 投票
1 回答
559 浏览

python-3.x - 调度气流任务组抛出 AttributeError

所以我在任务组中创建任务,并试图将它们添加到我的任务序列中,但它抛出了这个错误:

我正在创建我的任务组和这样的任务:

如果我删除任务组并通过删除行将任务组任务从排序中退出

我可以看到 a1、a2、a3 和 a4 的顺序正确,并且我可以断开断开的run_model_task_group任务,但是只要将其添加到顺序中,就会出现上述错误。

谁能指导我这里可能发生的事情? 在此处输入图像描述

请注意,我使用函数获取dagtask_group参数来创建任务组任务,因为我也想为另一个 dag 创建相同的任务集。

0 投票
1 回答
873 浏览

docker - 如何设置气流工作者以允许网络服务器使用 docker 在不同机器上获取日志?

我最近刚刚使用 docker 容器安装了气流 2.1.4,我已经成功地在同一台机器上使用 docker-compose 设置了 postgres、redis、调度程序、2x 本地工作人员和花。

现在我想扩展,并在其他机器上设置工人。

我能够让工作人员启动并运行,flower 能够找到工作节点,工作人员正在从调度程序正确接收任务,但无论任务的结果状态如何,任务都会被标记为失败并显示错误消息如下所示:

然后我尝试替换AIRFLOW__CORE__HOSTNAME_CALLABLE: 'socket.getfqdn'AIRFLOW__CORE__HOSTNAME_CALLABLE: 'airflow.utils.net.get_host_ip_address'

我收到了这个错误:

然后我尝试将worker的端口8793与其主机(在下面的worker_4中)映射,现在它正在返回:

但有时仍会出现“名称解析暂时失败”错误。

我还尝试复制错误中的 URL,并将 IP 替换为主机 ip,并收到以下消息:

如果需要其他信息,请告诉我。

提前致谢!

下面是调度程序/webserver/flower 的 docker-compose.yml:

和我的 docker-compose.yml 用于另一台机器上的工人:

0 投票
1 回答
65 浏览

airflow - 无法为 Airflow LocalExecutor 设置 MySQL 后端

我需要并行运行 dags,但不需要显着扩展,因此 LocalExecutor 可以很好地完成这项工作。我查看了 Airflow 文档并首先创建了一个 MySQL 数据库:

然后我在airflow.cfg文件中修改以下参数:

当我运行airflow db init时,我遇到以下错误消息:

AttributeError:“MySQLConverter”对象没有属性“_dagruntype_to_mysql”

在处理上述异常的过程中,又发生了一个异常:TypeError: Python 'dagruntype' cannot be convert to a MySQL type

请注意,airflow.cfg文件中没有其他任何内容被更改,并且使用带有 sqlite 的默认 SequentialExecutor 可以让一切正常运行。另请注意,我使用的是 Airflow 版本 2.2.0

0 投票
0 回答
37 浏览

airflow - 如何将 conf 传递给计划的 DAG

当手动触发 DAG 时,有多种方法可以传递配置。它可以从 UI 中完成,通过气流 CLI 使用--conf参数和使用 REST API。

但是,当使用 cron 表达式调度 DAG 时,DAG 总是会失败,因为 DAG 中的任务需要来自conf.

是否有可用于为值设置“默认”值的 DAG 级别配置conf(无需对 Python 代码本身进行空检查并硬编码默认值)

我不想在代码本身中执行此 null 检查的原因是因为我希望conf尽可能通过 Airflow API 公开键和默认值

0 投票
1 回答
48 浏览

airflow - 如何毫不拖延地安排执行?

我有一个问题,我需要每个月的第一天启动一个 DAG,但我有一个问题,DAG 于 10 月 1 日开始,但在 11 月 1 日当天执行,我需要 10 月 1 日执行 10 月 1 日和 11 月 1 日执行 1十一月,不耽误执行一个月。

我的调度程序是:'0 10 1 * *'

谢谢

0 投票
0 回答
67 浏览

python - Apache Airflow:如何编写自定义传感器来检查特定条件并执行操作

我想在 Airflow 2 中编写一个自定义传感器。标准过程是覆盖 poke 方法以检查您想要的条件并在达到时终止任务。但我想要的是让我的传感器最初执行操作,然后休眠并检查特定条件。

我如何通过覆盖 poke 方法来做到这一点。我要执行的操作必须只发生一次,但在每个戳间隔期间,戳方法都会运行以检查条件。我如何在不覆盖执行方法的情况下实现这一点?

0 投票
1 回答
331 浏览

docker - Airflow DockerOperator 挂载导致基于 docker-compose 的 Airflow 设置出错

我正在使用 docker-compose 和 celery 执行器运行 Airflow 2.1.4。到目前为止,我已经能够从 celery 工作容器启动和运行简单的 DockerOperator 任务,但是现在当我尝试将目录从共享驱动器挂载到任务容器时,出现错误(下面的日志文件)。如果我没有定义 mounts 参数,Dag 可以正常工作。所以我猜一些信息或特权没有传递给由 celery worker 容器创建的容器。有什么建议下一步该尝试什么吗?

DAG 文件:

日志文件: