问题标签 [airflow-scheduler]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
20939 浏览

airflow - 气流调度器调度后续任务很慢

当我尝试在 Airflow 1.8.0 中运行 DAG 时,我发现在完成前任任务的时间和后继任务被拾取执行的时间之间需要很长时间(通常单个任务的执行时间更长)。Sequential、Local 和 Celery Executors 的场景也是如此。有没有办法减少提到的开销时间?(就像airflow.cfg中可以加快DAG执行的任何参数一样?)已添加甘特图以供参考: 甘特图

0 投票
0 回答
1018 浏览

sqlalchemy - Airflow heartbeat FATAL:抱歉,已经有太多客户了

有没有人在运行 subdag 时在心跳阶段遇到“抱歉,已经有太多客户”错误?

[2017-11-22 08:21:23,747] {jobs.py:2136} 错误 - 尝试心跳时出现异常!休眠 5.0s Traceback(最近一次调用最后):文件“/usr/local/lib/python2.7/site-packages/airflow/jobs.py”,第 2131 行,在 _execute self.heartbeat() 文件“/usr /local/lib/python2.7/site-packages/airflow/jobs.py",第 180 行,在 heartbeat job = session.query(BaseJob).filter(BaseJob.id == self.id).first() 文件中“/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py”,第 2755 行,在第一个 ret = list(self[0:1]) 文件中“/usr/local/lib/ python2.7/site-packages/sqlalchemy/orm/query.py”,第 2547 行,在getitem 返回列表(res)文件“/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query. py",第 2855 行,在iter中 返回self._execute_and_instances(上下文)文件“/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py”,第2876行,在_execute_and_instances close_with_result=True)文件“/usr/local/lib /python2.7/site-packages/sqlalchemy/orm/query.py”,第 2885 行,在 _get_bind_args **kw 文件“/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py ",第 2867 行,_connection_from_session conn = self.session.connection(**kw) 文件 "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py",第 998 行,在连接中execution_options=execution_options)文件“/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py”,第 1003 行,_connection_for_bind 引擎,execution_options)文件“/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py”,第 403 行,在 _connection_for_bind conn = bind.contextual_connect() 文件“/usr/local/lib/python2.7/site-packages/sqlalchemy/engine/base. py”,第 2112 行,在 contextual_connect self._wrap_pool_connect(self.pool.connect, None) 中,文件“/usr/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py”,第 2151 行,在 _wrap_pool_connect e、方言、自我)文件“/usr/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py”,第 1465 行,在 _handle_dbapi_exception_noconnection exc_info 文件“/usr/local/lib/python2 .7/site-packages/sqlalchemy/util/compat.py”,第 203 行,在 raise_from_cause reraise(类型(异常),异常,tb=exc_tb,cause=cause)文件“/usr/local/lib/python2.7 /site-packages/sqlalchemy/engine/base.py”,第 2147 行,在 _wrap_pool_connect 返回 fn() 文件“/usr/local/lib/python2.7/site-packages/sqlalchemy/pool.py”,第 387 行,在连接中返回 _ConnectionFairy._checkout(self) 文件“ /usr/local/lib/python2.7/site-packages/sqlalchemy/pool.py”,第 766 行,在 _checkout 仙女 = _ConnectionRecord.checkout(pool) 文件“/usr/local/lib/python2.7/site- packages/sqlalchemy/pool.py”,第 516 行,在结帐中 rec = pool._do_get() 文件“/usr/local/lib/python2.7/site-packages/sqlalchemy/pool.py”,第 1229 行,在 _do_get返回 self._create_connection() 文件“/usr/local/lib/python2.7/site-packages/sqlalchemy/pool.py”,第 333 行,在 _create_connection 返回 _ConnectionRecord(self) 文件“/usr/local/lib/python2 .7/site-packages/sqlalchemy/pool.py",第 461 行,在初始化 自我。连接(first_connect_check=True)文件“/usr/local/lib/python2.7/site-packages/sqlalchemy/pool.py”,第 651 行,在 __connect 连接 = pool._invoke_creator(self) 文件“/usr/local/ lib/python2.7/site-packages/sqlalchemy/engine/strategies.py”,第 105 行,在连接中返回 dialect.connect(*cargs, **cparams) 文件“/usr/local/lib/python2.7/site -packages/sqlalchemy/engine/default.py”,第 393 行,在连接中返回 self.dbapi.connect(*cargs, **cparams) 文件“/usr/local/lib/python2.7/site-packages/psycopg2/ __init .py",第 130 行,在连接 conn = _connect(dsn, connection_factory=connection_factory, **kwasync) OperationalError: (psycopg2.OperationalError) FATAL: 抱歉,已经有太多客户端

从airflow.cfg:在airflow.cfg:

sql_alchemy_pool_size = 100

sql_alchemy_pool_recycle = 3600

这个问题有已知的解决方案吗?我正在使用带 LocalExecutor 的气流 1.8.2。

0 投票
0 回答
319 浏览

airflow - Airflow DAG 中的条件触发需要更多时间来触发下一个 DAG?

我们有 10 个气流 DAG,我们正在使用条件触发器运行下一个 DAG。但是下一个 DAG 不会立即开始。触发需要 5 分钟。

例子:

Dag1 于 10:05 完成 Dag2 于 10:10 开始

如果我们能够消除 5 分钟的差距,整个 ETL 过程将在 2.5 小时内完成。但现在大约需要 3 到 4 个小时。

配置文件:

0 投票
3 回答
17198 浏览

airflow - 整个 DAG 的气流取决于过去

有没有办法在气流中使用depends_on_past整个 DagRun,而不仅仅是应用于任务?

我有一个每日 DAG,周五 DagRun 在第 4 个任务上出错,但周六和周日 DagRuns 仍然按计划运行。使用depends_on_past = True会在相同的第 4 个任务上暂停 DagRun,但前 3 个任务仍会运行。

我可以在 DagRun DB 表中state看到包含failedFriday DagRun 的一列。我想要的是一种将 DagRun 配置为在先前的 DagRun 失败时不启动的方法,在找到以前失败的任务之前不启动并运行。

有谁知道这是否可能?

0 投票
2 回答
7167 浏览

airflow - 如何从气流列表中删除损坏的 DAG?

在气流中显示有关 Broken DAG 的信息

我尝试使用 DAG copy_from_Oracle_to_MySQL.py 从 /data/airflow/dags 移动文件。

但气流显示信息

我必须做什么才能从 GUI 中清除有关 Broken DAG 的信息?

0 投票
1 回答
332 浏览

rabbitmq - 调度程序不排队作业

我正在尝试在 Kubernetes 上测试 Airflow。Scheduler、Worker、Queue 和 Webserver 都在不同的部署中,我使用 Celery Executor 来运行我的任务。

一切正常,除了调度程序无法排队作业。当我从 Web UI 或 CLI 手动执行 Airflow 时,它能够很好地运行我的任务,但我正在尝试测试调度程序以使其工作。

我的配置几乎和单台服务器上的一样:

我相信通过这些配置,我应该能够使其运行,但由于某种原因,只有工作人员能够看到 DAG 及其状态,而不是调度程序,即使调度程序能够很好地记录他们的心跳. 还有什么我应该调试或查看的吗?

0 投票
2 回答
1786 浏览

python - Ariflow [Errno 104] 对等方重置连接

在我尝试运行其中一个 dag 之后,当它产生此错误时,我正在尝试通过命令“气流调度程序”运行任务。

我正在使用 Python 3.5、Airflow 1.8、Celery 4.1.0 和 RabbitMQ 3.5.7 作为工作人员:看起来我在 RabbitMQ 上遇到了问题,但我无法弄清楚原因。

0 投票
2 回答
2750 浏览

airflow - 气流可以用来运行永无止境的任务吗?

我们可以通过将 task/dag 超时设置为 None 并手动触发其运行来使用气流 dag 来定义永无止境的作业(即具有无条件循环来消耗流数据的任务)吗?让气流监测器永无止境的任务会导致问题吗?谢谢

0 投票
1 回答
2677 浏览

scheduler - 气流调度程序没有接收 DAG 运行

我正在设置气流,以便网络服务器在一台机器上运行,调度程序在另一台机器上运行。两者共享同一个 MySQL Metastore 数据库。两个实例都出现在日志中没有任何错误,但调度程序没有拾取通过 Web UI 手动触发 DAG 创建的任何 DAG 运行。

MySQL 中的 dag_run 表显示了一些条目,都处于运行状态:

但是在另一台机器上启动并连接到同一个 MySQL 数据库的调度程序只是对与该数据库通信并实际运行这些 DAG 运行并将它们转换为任务实例不感兴趣。

不确定我在此处的设置中缺少什么。这么几个问题:

  1. 位于 $AIRFLOW_HOME/dags 的 DAGS 文件夹何时以及如何填充?我认为它是在网络服务器启动时。但是如果我只是在另一台机器上启动调度程序,那台机器上的 DAGS 文件夹将如何被填满?
  2. 目前,我只在托管网络服务器的机器上而不是在调度程序上执行气流 initdb。希望这是正确的。

我可以为调度程序启用调试日志以获取更多可能表明缺少什么的日志吗?从当前日志来看,它看起来只是在本地系统上的 DAGS 文件夹中查找,并且没有发现那里的 DAGS(甚至没有示例),尽管配置加载示例设置为 True。

不要认为这很重要,但我目前正在使用 LocalExecutor

任何帮助表示赞赏。

编辑:我知道我需要按照气流文档的建议在机器之间同步 DAGS 文件夹,但不确定这是否是调度程序在上述情况下没有接收任务的原因。

0 投票
3 回答
6889 浏览

ssl - 如何在 Apache Airflow 上启用 SSL?

我正在使用带有 LocalExecutor 的 Airflow 1.7.0,文档建议要启用 SSL,我们需要传递证书和密钥路径并将端口更改为 443,如下所示

我创建了使用 OpenSSL 生成的证书和密钥。创建证书/密钥时提供的详细信息也是正确的。但是,Airflow UI 仍然是 http 而不是 https。

任何指针都会有所帮助!

谢谢!