我浏览了关于 SO 的类似帖子,它们似乎专门针对使用 Docker 环境,并没有太大帮助。我们的有点不同,我们运行托管在Azure 应用服务上的 Airflow 的 docker 映像,但它连接到托管的 Azure Database for PostgreSQL 服务器(版本 11)。
Python = 3.8
Apache Airflow = 2.1.4
SQL Alchemy = 1.3.24
Executor = Local
环境已经设置好,在大多数情况下都可以正常工作。但是,当我们运行处理大量数据(通常为几 GB)的 DAG 时,我们会突然遇到 Heartbeat 问题。现在,我尝试通过sql_alchemy_connect_args变量在Airflow Config中为 Keep Alives设置值,并将变量web_server_master_timeout和web_server_worker_timeout更改为更高的值无济于事。
错误:
{base_job.py:222} ERROR - LocalTaskJob heartbeat got an exception
Traceback (most recent call last):
File “/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py”, line 2336, in _wrap_pool_connect
return fn()
File “/usr/local/lib/python3.8/site-packages/sqlalchemy/pool/base.py”, line 364, in connect
return _ConnectionFairy._checkout(self)
File “/usr/local/lib/python3.8/site-packages/sqlalchemy/pool/base.py”, line 778, in _checkout
fairy = _ConnectionRecord.checkout(pool)
File “/usr/local/lib/python3.8/site-packages/sqlalchemy/pool/base.py”, line 495, in checkout
rec = pool._do_get()
File “/usr/local/lib/python3.8/site-packages/sqlalchemy/pool/impl.py”, line 241, in _do_get
return self._create_connection()
File “/usr/local/lib/python3.8/site-packages/sqlalchemy/pool/base.py”, line 309, in _create_connection
return _ConnectionRecord(self)
File “/usr/local/lib/python3.8/site-packages/sqlalchemy/pool/base.py”, line 440, in __init__
self.__connect(first_connect_check=True)
File “/usr/local/lib/python3.8/site-packages/sqlalchemy/pool/base.py”, line 661, in __connect
pool.logger.debug(“Error on connect(): %s”, e)
File “/usr/local/lib/python3.8/site-packages/sqlalchemy/util/langhelpers.py”, line 68, in __exit__
compat.raise_(
File “/usr/local/lib/python3.8/site-packages/sqlalchemy/util/compat.py”, line 182, in raise_
raise exception
File “/usr/local/lib/python3.8/site-packages/sqlalchemy/pool/base.py”, line 656, in __connect
connection = pool._invoke_creator(self)
File “/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/strategies.py”, line 114, in connect
return dialect.connect(*cargs, **cparams)
File “/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/default.py”, line 508, in connect
return self.dbapi.connect(*cargs, **cparams)
File “/usr/local/lib/python3.8/site-packages/psycopg2/__init__.py”, line 122, in connect
conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
psycopg2.OperationalError: could not translate host name “<address>” to address: Temporary failure in name resolution
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File “/usr/local/lib/python3.8/site-packages/airflow/jobs/base_job.py”, line 194, in heartbeat
session.merge(self)
File “/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py”, line 2166, in merge
return self._merge(
File “/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py”, line 2244, in _merge
merged = self.query(mapper.class_).get(key[1])
File “/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/query.py”, line 1018, in get
return self._get_impl(ident, loading.load_on_pk_identity)
File “/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/query.py”, line 1135, in _get_impl
return db_load_fn(self, primary_key_identity)
File “/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/loading.py”, line 286, in load_on_pk_identity
return q.one()
File “/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/query.py”, line 3490, in one
ret = self.one_or_none()
File “/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/query.py”, line 3459, in one_or_none
ret = list(self)
File “/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/query.py”, line 3535, in __iter__
return self._execute_and_instances(context)
File “/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/query.py”, line 3556, in _execute_and_instances
conn = self._get_bind_args(
File “/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/query.py”, line 3571, in _get_bind_args
return fn(
File “/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/query.py”, line 3550, in _connection_from_session
conn = self.session.connection(**kw)
File “/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py”, line 1142, in connection
return self._connection_for_bind(
File “/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py”, line 1150, in _connection_for_bind
return self.transaction._connection_for_bind(
File “/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py”, line 433, in _connection_for_bind
conn = bind._contextual_connect()
File “/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py”, line 2302, in _contextual_connect
self._wrap_pool_connect(self.pool.connect, None),
File “/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py”, line 2339, in _wrap_pool_connect
Connection._handle_dbapi_exception_noconnection(
File “/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py”, line 1583, in _handle_dbapi_exception_noconnection
util.raise_(
File “/usr/local/lib/python3.8/site-packages/sqlalchemy/util/compat.py”, line 182, in raise_
raise exception
File “/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py”, line 2336, in _wrap_pool_connect
return fn()
File “/usr/local/lib/python3.8/site-packages/sqlalchemy/pool/base.py”, line 364, in connect
return _ConnectionFairy._checkout(self)
File “/usr/local/lib/python3.8/site-packages/sqlalchemy/pool/base.py”, line 778, in _checkout
fairy = _ConnectionRecord.checkout(pool)
File “/usr/local/lib/python3.8/site-packages/sqlalchemy/pool/base.py”, line 495, in checkout
rec = pool._do_get()
File “/usr/local/lib/python3.8/site-packages/sqlalchemy/pool/impl.py”, line 241, in _do_get
return self._create_connection()
File “/usr/local/lib/python3.8/site-packages/sqlalchemy/pool/base.py”, line 309, in _create_connection
return _ConnectionRecord(self)
File “/usr/local/lib/python3.8/site-packages/sqlalchemy/pool/base.py”, line 440, in __init__
self.__connect(first_connect_check=True)
File “/usr/local/lib/python3.8/site-packages/sqlalchemy/pool/base.py”, line 661, in __connect
pool.logger.debug(“Error on connect(): %s”, e)
File “/usr/local/lib/python3.8/site-packages/sqlalchemy/util/langhelpers.py”, line 68, in __exit__
compat.raise_(
File “/usr/local/lib/python3.8/site-packages/sqlalchemy/util/compat.py”, line 182, in raise_
raise exception
File “/usr/local/lib/python3.8/site-packages/sqlalchemy/pool/base.py”, line 656, in __connect
connection = pool._invoke_creator(self)
File “/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/strategies.py”, line 114, in connect
return dialect.connect(*cargs, **cparams)
File “/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/default.py”, line 508, in connect
return self.dbapi.connect(*cargs, **cparams)
File “/usr/local/lib/python3.8/site-packages/psycopg2/__init__.py”, line 122, in connect
conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) could not translate host name “<address>” to address: Temporary failure in name resolution
(Background on this error at: http://sqlalche.me/e/13/e3q8)
有人可以帮我解决这个问题吗?我束手无策,我不确定,如果我正在朝着正确的方向调试这个。