0

我浏览了关于 SO 的类似帖子,它们似乎专门针对使用 Docker 环境,并没有太大帮助。我们的有点不同,我们运行托管在Azure 应用服务上的 Airflow 的 docker 映像,但它连接到托管的 Azure Database for PostgreSQL 服务器(版本 11)。

Python = 3.8  
Apache Airflow = 2.1.4  
SQL Alchemy = 1.3.24  
Executor = Local

环境已经设置好,在大多数情况下都可以正常工作。但是,当我们运行处理大量数据(通常为几 GB)的 DAG 时,我们会突然遇到 Heartbeat 问题。现在,我尝试通过sql_alchemy_connect_args变量在Airflow Config中为 Keep Alives设置值,并将变量web_server_master_timeoutweb_server_worker_timeout更改为更高的值无济于事。

错误:

{base_job.py:222} ERROR - LocalTaskJob heartbeat got an exception
    Traceback (most recent call last):
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py”, line 2336, in _wrap_pool_connect
        return fn()
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/pool/base.py”, line 364, in connect
        return _ConnectionFairy._checkout(self)
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/pool/base.py”, line 778, in _checkout
        fairy = _ConnectionRecord.checkout(pool)
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/pool/base.py”, line 495, in checkout
        rec = pool._do_get()
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/pool/impl.py”, line 241, in _do_get
        return self._create_connection()
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/pool/base.py”, line 309, in _create_connection
        return _ConnectionRecord(self)
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/pool/base.py”, line 440, in __init__
        self.__connect(first_connect_check=True)
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/pool/base.py”, line 661, in __connect
        pool.logger.debug(“Error on connect(): %s”, e)
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/util/langhelpers.py”, line 68, in __exit__
        compat.raise_(
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/util/compat.py”, line 182, in raise_
        raise exception
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/pool/base.py”, line 656, in __connect
        connection = pool._invoke_creator(self)
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/strategies.py”, line 114, in connect
        return dialect.connect(*cargs, **cparams)
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/default.py”, line 508, in connect
        return self.dbapi.connect(*cargs, **cparams)
      File “/usr/local/lib/python3.8/site-packages/psycopg2/__init__.py”, line 122, in connect
        conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
    psycopg2.OperationalError: could not translate host name “<address>” to address: Temporary failure in name resolution
    
    
    The above exception was the direct cause of the following exception:
    
    Traceback (most recent call last):
      File “/usr/local/lib/python3.8/site-packages/airflow/jobs/base_job.py”, line 194, in heartbeat
        session.merge(self)
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py”, line 2166, in merge
        return self._merge(
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py”, line 2244, in _merge
        merged = self.query(mapper.class_).get(key[1])
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/query.py”, line 1018, in get
        return self._get_impl(ident, loading.load_on_pk_identity)
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/query.py”, line 1135, in _get_impl
        return db_load_fn(self, primary_key_identity)
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/loading.py”, line 286, in load_on_pk_identity
        return q.one()
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/query.py”, line 3490, in one
        ret = self.one_or_none()
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/query.py”, line 3459, in one_or_none
        ret = list(self)
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/query.py”, line 3535, in __iter__
        return self._execute_and_instances(context)
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/query.py”, line 3556, in _execute_and_instances
        conn = self._get_bind_args(
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/query.py”, line 3571, in _get_bind_args
        return fn(
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/query.py”, line 3550, in _connection_from_session
        conn = self.session.connection(**kw)
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py”, line 1142, in connection
        return self._connection_for_bind(
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py”, line 1150, in _connection_for_bind
        return self.transaction._connection_for_bind(
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py”, line 433, in _connection_for_bind
        conn = bind._contextual_connect()
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py”, line 2302, in _contextual_connect
        self._wrap_pool_connect(self.pool.connect, None),
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py”, line 2339, in _wrap_pool_connect
        Connection._handle_dbapi_exception_noconnection(
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py”, line 1583, in _handle_dbapi_exception_noconnection
        util.raise_(
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/util/compat.py”, line 182, in raise_
        raise exception
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py”, line 2336, in _wrap_pool_connect
        return fn()
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/pool/base.py”, line 364, in connect
        return _ConnectionFairy._checkout(self)
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/pool/base.py”, line 778, in _checkout
        fairy = _ConnectionRecord.checkout(pool)
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/pool/base.py”, line 495, in checkout
        rec = pool._do_get()
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/pool/impl.py”, line 241, in _do_get
        return self._create_connection()
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/pool/base.py”, line 309, in _create_connection
        return _ConnectionRecord(self)
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/pool/base.py”, line 440, in __init__
        self.__connect(first_connect_check=True)
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/pool/base.py”, line 661, in __connect
        pool.logger.debug(“Error on connect(): %s”, e)
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/util/langhelpers.py”, line 68, in __exit__
        compat.raise_(
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/util/compat.py”, line 182, in raise_
        raise exception
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/pool/base.py”, line 656, in __connect
        connection = pool._invoke_creator(self)
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/strategies.py”, line 114, in connect
        return dialect.connect(*cargs, **cparams)
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/default.py”, line 508, in connect
        return self.dbapi.connect(*cargs, **cparams)
      File “/usr/local/lib/python3.8/site-packages/psycopg2/__init__.py”, line 122, in connect
        conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
    sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) could not translate host name “<address>” to address: Temporary failure in name resolution
    
    (Background on this error at: http://sqlalche.me/e/13/e3q8)

有人可以帮我解决这个问题吗?我束手无策,我不确定,如果我正在朝着正确的方向调试这个。

4

0 回答 0