我是气流的新手。我正在尝试通过参考这篇文章https://stlong0521.github.io/20161023%20-%20Airflow.html使用 Celery Executor 设置气流的分布式模式
在详细了解规范之前,我想确认我已经在单独的实例上安装了 PostgreSQL。
设置的规格详述如下:
气流核心/服务器计算机
- 蟒蛇 3.5
- 气流(AIRFLOW_HOME = ~/airflow)
- 芹菜
- 精神科gp2
- 兔MQ
在 airflow.cfg 中进行的配置:
sql_alchemy_conn = postgresql+psycopg2://username:password@192.168.2.12:5432/airflow
executor = CeleryExecutor
broker_url = amqp://username:password@192.168.1.12:5672//
celery_result_backend = db+postgresql://username:password@192.168.2.12:5432/airflow
进行的测试:
RabbitMQ is running
Can connect to PostgreSQL and have confirmed that Airflow has created tables
Can start and view the webserver (including custom dags)
气流工作者计算机
已安装以下内容:
- Python 3.5 与
- 气流(AIRFLOW_HOME = ~/airflow)
- 芹菜
- 精神科gp2
airflow.cfg 中的配置与服务器中的配置完全相同:
sql_alchemy_conn = postgresql+psycopg2://username:password@192.168.2.12:5432/airflow
executor = CeleryExecutor
broker_url = amqp://username:password@192.168.1.12:5672//
celery_result_backend = db+postgresql://username:password@192.168.2.12:5432/airflow
在工作机器上运行的命令的输出:
运行气流花时:
[2018-02-19 14:58:14,276] {__init__.py:57} INFO - Using executor CeleryExecutor
[2018-02-19 14:58:14,360] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python3.5/lib2to3/Grammar.txt
[2018-02-19 14:58:14,384] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python3.5/lib2to3/PatternGrammar.txt
[I 180219 14:58:15 command:139] Visit me at http://0.0.0.0:5555
[I 180219 14:58:15 command:144] Broker: amqp://username:password@192.168.1.12:5672//
[I 180219 14:58:15 command:147] Registered tasks:
['celery.accumulate',
'celery.backend_cleanup',
'celery.chain',
'celery.chord',
'celery.chord_unlock',
'celery.chunks',
'celery.group',
'celery.map',
'celery.starmap']
[I 180219 14:58:15 mixins:224] Connected to amqp://username:password@192.168.1.12:5672//
我在Airflow Core 机器中传递 dag,并且我已经将 dag 将处理的示例数据(Excel 表)复制到同一台核心机器。
我的工人日志
raise CalledProcessError(retcode, cmd)
subprocess.CalledProcessError: Command 'airflow run dag_name_x task_name_xx 2018-02-19T10:15:41.657243 --local -sd /home/Distributedici/airflow/dags/sample_data_xx.py' returned non-zero exit status 1
现在我的查询是
1)我是否也应该将 dag 文件夹复制到工作计算机
2)现在,我还没有复制工作计算机上的 dag 文件夹,我无法看到工作进程接收任务。
请指出我在哪里犯了错误以及如何让工作进程接手该进程。