11

我是气流的新手。我正在尝试通过参考这篇文章https://stlong0521.github.io/20161023%20-%20Airflow.html使用 Celery Executor 设置气流的分布式模式

在详细了解规范之前,我想确认我已经在单独的实例上安装了 PostgreSQL

设置的规格详述如下:

气流核心/服务器计算机

  • 蟒蛇 3.5
    • 气流(AIRFLOW_HOME = ~/airflow)
    • 芹菜
    • 精神科gp2
  • 兔MQ

在 airflow.cfg 中进行的配置

sql_alchemy_conn = postgresql+psycopg2://username:password@192.168.2.12:5432/airflow
executor = CeleryExecutor
broker_url = amqp://username:password@192.168.1.12:5672//
celery_result_backend = db+postgresql://username:password@192.168.2.12:5432/airflow

进行的测试:

RabbitMQ is running
Can connect to PostgreSQL and have confirmed that Airflow has created tables
Can start and view the webserver (including custom dags)

气流工作者计算机

已安装以下内容:

  • Python 3.5 与
    • 气流(AIRFLOW_HOME = ~/airflow)
    • 芹菜
  • 精神科gp2

airflow.cfg 中的配置与服务器中的配置完全相同:

sql_alchemy_conn = postgresql+psycopg2://username:password@192.168.2.12:5432/airflow
executor = CeleryExecutor
broker_url = amqp://username:password@192.168.1.12:5672//
celery_result_backend = db+postgresql://username:password@192.168.2.12:5432/airflow

在工作机器上运行的命令的输出:

运行气流花时:

[2018-02-19 14:58:14,276] {__init__.py:57} INFO - Using executor CeleryExecutor
[2018-02-19 14:58:14,360] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python3.5/lib2to3/Grammar.txt
[2018-02-19 14:58:14,384] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python3.5/lib2to3/PatternGrammar.txt
[I 180219 14:58:15 command:139] Visit me at http://0.0.0.0:5555
[I 180219 14:58:15 command:144] Broker: amqp://username:password@192.168.1.12:5672//
[I 180219 14:58:15 command:147] Registered tasks: 
    ['celery.accumulate',
     'celery.backend_cleanup',
     'celery.chain',
     'celery.chord',
     'celery.chord_unlock',
     'celery.chunks',
     'celery.group',
     'celery.map',
     'celery.starmap']
[I 180219 14:58:15 mixins:224] Connected to amqp://username:password@192.168.1.12:5672//

我在Airflow Core 机器中传递 dag,并且我已经将 dag 将处理的示例数据(Excel 表)复制到同一台核心机器。

我的工人日志 raise CalledProcessError(retcode, cmd) subprocess.CalledProcessError: Command 'airflow run dag_name_x task_name_xx 2018-02-19T10:15:41.657243 --local -sd /home/Distributedici/airflow/dags/sample_data_xx.py' returned non-zero exit status 1

现在我的查询是

1)我是否也应该将 dag 文件夹复制到工作计算机

2)现在,我还没有复制工作计算机上的 dag 文件夹,我无法看到工作进程接收任务。

请指出我在哪里犯了错误以及如何让工作进程接手该进程。

4

4 回答 4

4

Airflow 的一些最大痛点出现在部署和保持 DAG 文件和插件在 Airflow 调度程序、Airflow 网络服务器和 Celery 工作节点之间同步。

我们创建了一个名为Astronomer Open的开源项目,它可以自动执行 Dockerized Airflow、Celery 和 PostgreSQL 以及其他一些好东西。这个项目的动机是看到这么多人遇到相同的痛点,创建了一个非常相似的设置。

例如,这是 Airflow Dockerfile:https ://github.com/astronomer/astronomer/blob/master/docker/airflow/1.10.2/Dockerfile

和文档:https ://open.astronomer.io/

全面披露:这是我在工作中参与的一个项目——我们还提供了一个在 Kubernetes 上运行的付费企业版(文档)。也就是说,开放版完全免费使用。

于 2018-05-31T17:05:21.040 回答
3

您的配置文件看起来不错。正如您所怀疑的,所有工作人员确实都需要 DAG 文件夹的副本。您可以使用类似的东西git使它们保持同步和最新。

于 2018-02-21T18:16:49.733 回答
0

对此有点晚了,但它可能仍然对某人有所帮助,因为从现有答案看来,除了“手动”部署(通过 git/scp 等)之外,没有办法共享 DAG,虽然有办法。

Airflow 支持酸洗-p来自 CLI 或command: scheduler -pdocker-compose 文件中的参数),它允许在服务器/主服务器上部署 DAG,并将它们序列化并发送给工作人员(因此您不必在多个部署 DAG并且您可以避免 DAG 不同步的问题)。

酸洗与CeleryExecutor.

Pickling 有一些限制可能会让你反感,特别是类和函数的实际代码没有序列化(只有完全限定的名称),所以如果你尝试反序列化引用你没有的代码的 DAG 将会出错在目标环境中。有关泡菜的更多信息,您可以在这里查看:https ://docs.python.org/3.3/library/pickle.html

于 2019-10-03T12:22:50.550 回答
0

是的,dag 必须存在于所有气流节点上 - 工作程序、网络服务器、调度程序。

您可以在所有节点上的 dags 文件夹中运行 git pull 的 cron 以保持同步。

Airflow 会将所有 dag 移动到数据库而不是文件系统——这个特性可能会在 2.0 中出现。

于 2020-07-22T21:31:29.240 回答