0

作为一个 Airflow 和 Python 新手,我什至不知道我问的问题是否正确,但无论如何都要问。我在 CentOS 系统上配置了气流。使用远程 MySql 实例作为后端。在我的代码中,需要获取一些变量,代码如下所示:

import os
from airflow.models import Variable

print(os.environ['SHELL'])
local_env['SHELL'] = Variable.get('SHELL')

我收到以下错误:

回溯(最后一次调用):文件“test2.py”,第 5 行,在 local_env['SHELL'] = Variable.get('SHELL') 文件“/com/work/airflowenv/lib/python2.7/site -packages/airflow/utils/db.py”,第 53 行,在包装结果 = func(*args, **kwargs) 文件“/com/work/airflowenv/lib/python2.7/site-packages/airflow/models .py",第 3134 行,在 get raise ValueError('Variable {} 不存在'.format(key)) ValueError: 变量 SHELL 不存在

在models.py的这段代码中,正是Variable.get()方法抛出了异常:

    @classmethod
    @provide_session
    def get(cls, key, default_var=None, deserialize_json=False, session=None):
        obj = session.query(cls).filter(cls.key == key).first()
        if obj is None:
            if default_var is not None:
                return default_var
            else:
                raise ValueError('Variable {} does not exist'.format(key))

其中 session.query 已经产生无。不太明白这里是如何注入会话的。以及为什么没有设置这些会话变量。我们应该在远程 MySQL 实例上设置一些东西吗?

顺便说一句,我们在具有本地 mysql 实例的另一台机器上有另一个相同的气流实例。并且单独运行我提供的脚本没有问题:

[2016-09-27 01:54:48,341] { init .py:36} 信息 - 使用执行器

本地执行器

/bin/bash /bin/bash

设置气流时我错过了什么?谢谢,

4

2 回答 2

1

好的,终于解决了问题。我所做的是打印出查询,并确定变量必须来自某个称为变量的关系数据库表。并挖掘后端数据库,找到数据库,将其与工作数据库进行比较,发现“变量”表数据丢失。添加这些变量的方法很简单:气流变量 -s SHELL /bin/bash 等其他变量。

于 2016-09-26T21:02:49.033 回答
0

一般来说,当需要使用 Airflow db 来存储一些信息时,Airflow 中的 Variable 模型已经被用作轻量级的键值存储。

如果要存储执行参数,尤其是看起来像静态参数的参数,我建议将其作为 DAG 定义中的 params 文件。

于 2018-01-17T17:28:31.877 回答