问题标签 [airflow-2.x]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
redis - Airflow 2.0 - 即使池插槽可用,任务仍处于排队状态
蟒蛇依赖:
- 阿帕奇气流==2.1.3
- redis==3.5.3
- 芹菜==4.4.7
气流配置:
- core.parallelism = 1000
- core.dag_concurrency = 400
- core.max_active_runs_per_dag = 1
- celery.worker_concurrency = 6
- celery.broker_url = redis
- celery.result_backend = mysql
升级到气流 2.0 后面临这个问题
并非所有任务都卡在“排队”状态,一次只有 1 个任务卡住(超过 100 个)。
有时任务也停留在“运行”状态。
此问题在 1-2 天内发生一次。
即使池槽可用(槽 = 128,运行槽 = 0,排队槽 = 1)也会发生这种情况
根据工作人员日志(对于卡住的任务和正常任务):
我在下面看到所有正常工作任务的日志,这对于卡住的任务是缺失的:
apache-spark - 我可以使用 Airflow 启动/停止火花流作业吗
我有两种类型的工作:Spark Batch 作业和 Spark 流式作业。
我想用气流来安排和管理它们。
用于作业的气流已停止。但我想将它用于我的流媒体工作。谁能给我一些想法或其他工具来完成我的流媒体工作
airflow - 如何最好地将“ftp to s3 operator”合并到现有的 Airflow DAG 中?希望使用airflow.providers.amazon.aws.transfers.ftp_to_s3
我对 Airflow 比较陌生,正在寻求将 #following #airflow.providers.amazon.aws.transfers.ftp_to_s3 引入我的 DAG 的帮助。#参数(s3_bucket、aws_conn_id、ftp_conn_id 等)如何传递给这个操作符?这些参数会存在于引用的 python 脚本中吗?
docker - 如何设置气流工作者以允许网络服务器使用 docker 在不同机器上获取日志?
我最近刚刚使用 docker 容器安装了气流 2.1.4,我已经成功地在同一台机器上使用 docker-compose 设置了 postgres、redis、调度程序、2x 本地工作人员和花。
现在我想扩展,并在其他机器上设置工人。
我能够让工作人员启动并运行,flower 能够找到工作节点,工作人员正在从调度程序正确接收任务,但无论任务的结果状态如何,任务都会被标记为失败并显示错误消息如下所示:
然后我尝试替换AIRFLOW__CORE__HOSTNAME_CALLABLE: 'socket.getfqdn'
为AIRFLOW__CORE__HOSTNAME_CALLABLE: 'airflow.utils.net.get_host_ip_address'
我收到了这个错误:
然后我尝试将worker的端口8793与其主机(在下面的worker_4中)映射,现在它正在返回:
但有时仍会出现“名称解析暂时失败”错误。
我还尝试复制错误中的 URL,并将 IP 替换为主机 ip,并收到以下消息:
如果需要其他信息,请告诉我。
提前致谢!
下面是调度程序/webserver/flower 的 docker-compose.yml:
和我的 docker-compose.yml 用于另一台机器上的工人:
airflow - 无法为 Airflow LocalExecutor 设置 MySQL 后端
我需要并行运行 dags,但不需要显着扩展,因此 LocalExecutor 可以很好地完成这项工作。我查看了 Airflow 文档并首先创建了一个 MySQL 数据库:
然后我在airflow.cfg文件中修改以下参数:
当我运行airflow db init时,我遇到以下错误消息:
AttributeError:“MySQLConverter”对象没有属性“_dagruntype_to_mysql”
在处理上述异常的过程中,又发生了一个异常:TypeError: Python 'dagruntype' cannot be convert to a MySQL type
请注意,airflow.cfg文件中没有其他任何内容被更改,并且使用带有 sqlite 的默认 SequentialExecutor 可以让一切正常运行。另请注意,我使用的是 Airflow 版本 2.2.0
airflow - 如何将 conf 传递给计划的 DAG
当手动触发 DAG 时,有多种方法可以传递配置。它可以从 UI 中完成,通过气流 CLI 使用--conf
参数和使用 REST API。
但是,当使用 cron 表达式调度 DAG 时,DAG 总是会失败,因为 DAG 中的任务需要来自conf
.
是否有可用于为值设置“默认”值的 DAG 级别配置conf
(无需对 Python 代码本身进行空检查并硬编码默认值)
我不想在代码本身中执行此 null 检查的原因是因为我希望conf
尽可能通过 Airflow API 公开键和默认值
airflow - 如何毫不拖延地安排执行?
我有一个问题,我需要每个月的第一天启动一个 DAG,但我有一个问题,DAG 于 10 月 1 日开始,但在 11 月 1 日当天执行,我需要 10 月 1 日执行 10 月 1 日和 11 月 1 日执行 1十一月,不耽误执行一个月。
我的调度程序是:'0 10 1 * *'
谢谢
python - Apache Airflow:如何编写自定义传感器来检查特定条件并执行操作
我想在 Airflow 2 中编写一个自定义传感器。标准过程是覆盖 poke 方法以检查您想要的条件并在达到时终止任务。但我想要的是让我的传感器最初执行操作,然后休眠并检查特定条件。
我如何通过覆盖 poke 方法来做到这一点。我要执行的操作必须只发生一次,但在每个戳间隔期间,戳方法都会运行以检查条件。我如何在不覆盖执行方法的情况下实现这一点?
docker - Airflow DockerOperator 挂载导致基于 docker-compose 的 Airflow 设置出错
我正在使用 docker-compose 和 celery 执行器运行 Airflow 2.1.4。到目前为止,我已经能够从 celery 工作容器启动和运行简单的 DockerOperator 任务,但是现在当我尝试将目录从共享驱动器挂载到任务容器时,出现错误(下面的日志文件)。如果我没有定义 mounts 参数,Dag 可以正常工作。所以我猜一些信息或特权没有传递给由 celery worker 容器创建的容器。有什么建议下一步该尝试什么吗?
DAG 文件:
日志文件: