我刚刚将我的 Airflow 从 1.10.13 升级到 2.0。我正在使用 Kubernetes Executor 在 Kubernetes (AKS Azure) 中运行它。不幸的是,由于 Liveness 探测失败,我看到我的调度程序每 15-20 分钟被杀死一次。因此我的 pod 不断重启。
我在 1.10.13 中没有任何问题。
这是我的 Liveness 探针:
import os
os.environ['AIRFLOW__CORE__LOGGING_LEVEL'] = 'ERROR'
os.environ['AIRFLOW__LOGGING__LOGGING_LEVEL'] = 'ERROR'
from airflow.jobs.scheduler_job import SchedulerJob
from airflow.utils.db import create_session
from airflow.utils.net import get_hostname
import sys
with create_session() as session:
job = session.query(SchedulerJob).filter_by(hostname=get_hostname()).order_by(
SchedulerJob.latest_heartbeat.desc()).limit(1).first()
sys.exit(0 if job.is_alive() else 1)
当我查看调度程序日志时,我看到以下内容:
[2021-02-16 12:18:21,883] {scheduler_job.py:309} DEBUG - Waiting for <ForkProcess name='DagFileProcessor489-Process' pid=12812 parent=9286 stopped exitcode=0>
[2021-02-16 12:18:22,228] {scheduler_job.py:933} DEBUG - No tasks to consider for execution.
[2021-02-16 12:18:22,232] {base_executor.py:147} DEBUG - 0 running task instances
[2021-02-16 12:18:22,232] {base_executor.py:148} DEBUG - 0 in queue
[2021-02-16 12:18:22,232] {base_executor.py:149} DEBUG - 32 open slots
[2021-02-16 12:18:22,232] {base_executor.py:158} DEBUG - Calling the <class 'airflow.executors.kubernetes_executor.KubernetesExecutor'> sync method
[2021-02-16 12:18:22,233] {kubernetes_executor.py:337} DEBUG - Syncing KubernetesExecutor
[2021-02-16 12:18:22,233] {kubernetes_executor.py:263} DEBUG - KubeJobWatcher alive, continuing
[2021-02-16 12:18:22,234] {dag_processing.py:383} DEBUG - Received message of type DagParsingStat
[2021-02-16 12:18:22,234] {dag_processing.py:383} DEBUG - Received message of type DagParsingStat
[2021-02-16 12:18:22,236] {dag_processing.py:383} DEBUG - Received message of type DagParsingStat
[2021-02-16 12:18:22,246] {scheduler_job.py:1390} DEBUG - Next timed event is in 0.143059
[2021-02-16 12:18:22,246] {scheduler_job.py:1392} DEBUG - Ran scheduling loop in 0.05 seconds
[2021-02-16 12:18:22,422] {scheduler_job.py:933} DEBUG - No tasks to consider for execution.
[2021-02-16 12:18:22,426] {base_executor.py:147} DEBUG - 0 running task instances
[2021-02-16 12:18:22,426] {base_executor.py:148} DEBUG - 0 in queue
[2021-02-16 12:18:22,426] {base_executor.py:149} DEBUG - 32 open slots
[2021-02-16 12:18:22,427] {base_executor.py:158} DEBUG - Calling the <class 'airflow.executors.kubernetes_executor.KubernetesExecutor'> sync method
[2021-02-16 12:18:22,427] {kubernetes_executor.py:337} DEBUG - Syncing KubernetesExecutor
[2021-02-16 12:18:22,427] {kubernetes_executor.py:263} DEBUG - KubeJobWatcher alive, continuing
[2021-02-16 12:18:22,439] {scheduler_job.py:1751} INFO - Resetting orphaned tasks for active dag runs
[2021-02-16 12:18:22,452] {settings.py:290} DEBUG - Disposing DB connection pool (PID 12819)
[2021-02-16 12:18:22,460] {scheduler_job.py:309} DEBUG - Waiting for <ForkProcess name='DagFileProcessor490-Process' pid=12819 parent=9286 stopped exitcode=0>
[2021-02-16 12:18:23,009] {settings.py:290} DEBUG - Disposing DB connection pool (PID 12826)
[2021-02-16 12:18:23,017] {scheduler_job.py:309} DEBUG - Waiting for <ForkProcess name='DagFileProcessor491-Process' pid=12826 parent=9286 stopped exitcode=0>
[2021-02-16 12:18:23,594] {settings.py:290} DEBUG - Disposing DB connection pool (PID 12833)
... Many of these Disposing DB connection pool entries here
[2021-02-16 12:20:08,212] {scheduler_job.py:309} DEBUG - Waiting for <ForkProcess name='DagFileProcessor675-Process' pid=14146 parent=9286 stopped exitcode=0>
[2021-02-16 12:20:08,916] {settings.py:290} DEBUG - Disposing DB connection pool (PID 14153)
[2021-02-16 12:20:08,924] {scheduler_job.py:309} DEBUG - Waiting for <ForkProcess name='DagFileProcessor676-Process' pid=14153 parent=9286 stopped exitcode=0>
[2021-02-16 12:20:09,475] {settings.py:290} DEBUG - Disposing DB connection pool (PID 14160)
[2021-02-16 12:20:09,484] {scheduler_job.py:309} DEBUG - Waiting for <ForkProcess name='DagFileProcessor677-Process' pid=14160 parent=9286 stopped exitcode=0>
[2021-02-16 12:20:10,044] {settings.py:290} DEBUG - Disposing DB connection pool (PID 14167)
[2021-02-16 12:20:10,053] {scheduler_job.py:309} DEBUG - Waiting for <ForkProcess name='DagFileProcessor678-Process' pid=14167 parent=9286 stopped exitcode=0>
[2021-02-16 12:20:10,610] {settings.py:290} DEBUG - Disposing DB connection pool (PID 14180)
[2021-02-16 12:23:42,287] {scheduler_job.py:746} INFO - Exiting gracefully upon receiving signal 15
[2021-02-16 12:23:43,290] {process_utils.py:95} INFO - Sending Signals.SIGTERM to GPID 9286
[2021-02-16 12:23:43,494] {process_utils.py:201} INFO - Waiting up to 5 seconds for processes to exit...
[2021-02-16 12:23:43,503] {process_utils.py:61} INFO - Process psutil.Process(pid=14180, status='terminated', started='12:20:09') (14180) terminated with exit code None
[2021-02-16 12:23:43,503] {process_utils.py:61} INFO - Process psutil.Process(pid=9286, status='terminated', exitcode=0, started='12:13:35') (9286) terminated with exit code 0
[2021-02-16 12:23:43,506] {process_utils.py:95} INFO - Sending Signals.SIGTERM to GPID 9286
[2021-02-16 12:23:43,506] {scheduler_job.py:1296} INFO - Exited execute loop
[2021-02-16 12:23:43,523] {cli_action_loggers.py:84} DEBUG - Calling callbacks: []
[2021-02-16 12:23:43,525] {settings.py:290} DEBUG - Disposing DB connection pool (PID 7)