1

我正在使用气流 1.7.1.3 和 python 2.7

当我使用单独运行每个任务时,我创建了一个完美运行的 DAG

气流测试 [myDAG] [myTask] 2016-10-14

然而,

气流 trigger_dag [myDAG]

或者

气流运行 [myDAG] [myTask] 2016-10-14

两者都会引发“此连接已关闭”SQLalchemy 错误。

[...]
    with self.engine.connect() as connection:
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", line 2016, in connect
    return self._connection_cls(self, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", line 72, in __init__
    if connection is not None else engine.raw_connection()
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", line 2102, in raw_connection
    self.pool.unique_connection, _connection)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", line 2072, in _wrap_pool_connect
    return fn()
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/pool.py", line 318, in unique_connection
    return _ConnectionFairy._checkout(self)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/pool.py", line 746, in _checkout
    raise exc.InvalidRequestError("This connection is closed")
InvalidRequestError: This connection is closed
[2016-10-14 15:49:30,704] {models.py:1306} INFO - Marking task as FAILED.
[2016-10-14 15:49:30,712] {models.py:1327} ERROR - This connection is closed

这是通过 SQLalchemy 与 Oracle 12 数据库的连接,当我在脚本中使用 session.commit() 时会引发此错误。

有人知道什么可以解释这种差异和错误吗?

4

1 回答 1

1

这是此处概述的已知错误。在修复此错误之前,您将无法通过 SQLAlchemy 连接到 Oracle。

问题是由于源代码中的一些 SQL 语法造成的。您不能在 Oracle 中说“SELECT 1”,您需要说“SELECT 1 FROM DUAL”。

也许考虑使用 Airflow 中的一个钩子:https ://github.com/apache/incubator-airflow/tree/master/airflow/hooks

似乎有一个 oracle_hook 可以帮助你。祝你好运。

于 2016-12-05T13:40:19.080 回答