28

我正在使用 Airflow v1.8.1 并在 kubernetes 和 Docker 上运行所有组件(worker、web、flower、scheduler)。我将 Celery Executor 与 Redis 一起使用,我的任务如下所示:

(start) -> (do_work_for_product1)
     ├  -> (do_work_for_product2)
     ├  -> (do_work_for_product3)
     ├  …

所以start任务有多个下游。我设置并发相关配置如下:

parallelism = 3
dag_concurrency = 3
max_active_runs = 1

然后当我手动运行这个 DAG 时(不确定它是否永远不会发生在计划任务上),一些下游被执行,但另一些则停留在“排队”状态。

如果我从管理 UI 中清除任务,它就会被执行。没有工作日志(在处理了一些第一个下游之后,它只是不输出任何日志)。

Web 服务器的日志(不确定worker exiting是否相关)

/usr/local/lib/python2.7/dist-packages/flask/exthook.py:71: ExtDeprecationWarning: Importing flask.ext.cache is deprecated, use flask_cache instead.
  .format(x=modname), ExtDeprecationWarning
[2017-08-24 04:20:56,496] [51] {models.py:168} INFO - Filling up the DagBag from /usr/local/airflow_dags
[2017-08-24 04:20:57 +0000] [27] [INFO] Handling signal: ttou
[2017-08-24 04:20:57 +0000] [37] [INFO] Worker exiting (pid: 37)

调度程序也没有错误日志。每当我尝试此操作时,许多卡住的任务都会发生变化。

因为我也使用 Docker,所以我想知道这是否相关: https ://github.com/puckel/docker-airflow/issues/94 但到目前为止,还没有任何线索。

有没有人遇到过类似的问题或知道我可以针对这个问题调查什么......?

4

5 回答 5

8

任务卡住很可能是一个错误。目前(<= 1.9.0alpha1),当任务甚至无法在(远程)工作人员上启动时,可能会发生这种情况。例如,在工作人员过载或缺少依赖项的情况下会发生这种情况。

这个补丁应该可以解决这个问题。

值得研究为什么您的任务没有获得 RUNNING 状态。将自己设置为这种状态是任务所做的第一件事。通常,工作人员在开始执行之前会记录日志,并且还会报告错误。您应该能够在任务日志中找到此条目。

编辑:正如在原始问题的评论中提到的,如果气流无法运行任务的一个例子是它无法写入所需的位置。这使得它无法继续,任务会卡住。该补丁通过使调度程序中的任务失败来解决此问题。

于 2017-10-22T19:52:51.627 回答
3

我一直在研究同一个 docker image puckel。我的问题已通过以下方式解决:

更换

 result_backend = db+postgresql://airflow:airflow@postgres/airflow

celery_result_backend = db+postgresql://airflow:airflow@postgres/airflow

我认为这是 puckel 最新发布的更新。更改在 2018 年 2 月左右恢复,您的评论是在 1 月发表的。

于 2018-04-20T18:19:48.947 回答
3

请尝试airflow schedulerairflow worker命令。

我认为airflow worker调用每个任务,airflow scheduler在两个任务之间调用。

于 2018-07-04T11:34:44.340 回答
2

我们有一个解决方案,想在 1.9 正式发布之前在这里分享。感谢 Bolke de Bruin 对 1.9 的更新。在我在 1.9 之前的情况下,目前我们使用 1.8.1 是为了让另一个 DAG 运行以清除任务,queue state如果它在那里停留超过 30 分钟。

于 2017-10-23T16:25:13.000 回答
0

就我而言,所有 Airflow 任务都卡住了,而且没有一个正在运行。以下是我为修复它所做的步骤:

  1. 杀死所有气流过程,使用$ kill -9 <pid>
  2. 杀死所有芹菜进程,使用$ pkill celery
  3. 在气流.cfg 文件中增加celery 的worker_concurrency, parallelism,配置计数。dag_concurrency
  4. 启动气流,首先检查气流网络服务器是否自动启动,在我的情况下,它正在通过 Gunicorn 运行,否则开始使用$ airflow webserver &
  5. 启动气流调度程序$ airflow scheduler
  6. 启动气流工作者$ airflow worker
  7. 尝试运行作业。
于 2021-01-13T16:12:16.790 回答