作为一个 Airflow 和 Python 新手,我什至不知道我问的问题是否正确,但无论如何都要问。我在 CentOS 系统上配置了气流。使用远程 MySql 实例作为后端。在我的代码中,需要获取一些变量,代码如下所示:
import os
from airflow.models import Variable
print(os.environ['SHELL'])
local_env['SHELL'] = Variable.get('SHELL')
我收到以下错误:
回溯(最后一次调用):文件“test2.py”,第 5 行,在 local_env['SHELL'] = Variable.get('SHELL') 文件“/com/work/airflowenv/lib/python2.7/site -packages/airflow/utils/db.py”,第 53 行,在包装结果 = func(*args, **kwargs) 文件“/com/work/airflowenv/lib/python2.7/site-packages/airflow/models .py",第 3134 行,在 get raise ValueError('Variable {} 不存在'.format(key)) ValueError: 变量 SHELL 不存在
在models.py的这段代码中,正是Variable.get()方法抛出了异常:
@classmethod
@provide_session
def get(cls, key, default_var=None, deserialize_json=False, session=None):
obj = session.query(cls).filter(cls.key == key).first()
if obj is None:
if default_var is not None:
return default_var
else:
raise ValueError('Variable {} does not exist'.format(key))
其中 session.query 已经产生无。不太明白这里是如何注入会话的。以及为什么没有设置这些会话变量。我们应该在远程 MySQL 实例上设置一些东西吗?
顺便说一句,我们在具有本地 mysql 实例的另一台机器上有另一个相同的气流实例。并且单独运行我提供的脚本没有问题:
[2016-09-27 01:54:48,341] { init .py:36} 信息 - 使用执行器
本地执行器
/bin/bash /bin/bash
设置气流时我错过了什么?谢谢,