我从 SequentialExecutor 切换到 LocalExecutor,因为我希望我的 dag 实例之一并行运行。当我并行运行它们或作为单个实例运行它们时,我收到了 postgress 错误。
在我的 dag 中,我有 3 个 bash 操作员任务:“test1_run”、“test2_run”和“test3_run”
我正在使用以下命令从 CLI 触发 dag:
airflow dags trigger 'parallel_test_dag' --conf '{"message":"test1"}'
and then,
airflow dags trigger 'parallel_test_dag' --conf '{"message":"test2"}'
from two different terminals
我正在附加堆栈跟踪
Traceback (most recent call last):
File "/home/ubu20/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1276, in _execute_context
self.dialect.do_execute(
File "/home/ubu20/.local/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 608, in do_execute
cursor.execute(statement, parameters)
psycopg2.OperationalError: SSL error: sslv3 alert bad record mac
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/ubu20/.local/bin/airflow", line 8, in <module>
sys.exit(main())
File "/home/ubu20/.local/lib/python3.8/site-packages/airflow/__main__.py", line 40, in main
args.func(args)
File "/home/ubu20/.local/lib/python3.8/site-packages/airflow/cli/cli_parser.py", line 48, in command
return func(*args, **kwargs)
File "/home/ubu20/.local/lib/python3.8/site-packages/airflow/utils/cli.py", line 91, in wrapper
return f(*args, **kwargs)
File "/home/ubu20/.local/lib/python3.8/site-packages/airflow/cli/commands/scheduler_command.py", line 64, in scheduler
job.run()
File "/home/ubu20/.local/lib/python3.8/site-packages/airflow/jobs/base_job.py", line 245, in run
self._execute()
File "/home/ubu20/.local/lib/python3.8/site-packages/airflow/jobs/scheduler_job.py", line 1299, in _execute
self._run_scheduler_loop()
File "/home/ubu20/.local/lib/python3.8/site-packages/airflow/jobs/scheduler_job.py", line 1396, in _run_scheduler_loop
num_finished_events = self._process_executor_events(session=session)
File "/home/ubu20/.local/lib/python3.8/site-packages/airflow/utils/session.py", line 67, in wrapper
return func(*args, **kwargs)
File "/home/ubu20/.local/lib/python3.8/site-packages/airflow/jobs/scheduler_job.py", line 1236, in _process_executor_events
tis: List[TI] = session.query(TI).filter(filter_for_tis).options(selectinload('dag_model')).all()
File "/home/ubu20/.local/lib/python3.8/site-packages/sqlalchemy/orm/query.py", line 3373, in all
return list(self)
File "/home/ubu20/.local/lib/python3.8/site-packages/sqlalchemy/orm/query.py", line 3535, in __iter__
return self._execute_and_instances(context)
File "/home/ubu20/.local/lib/python3.8/site-packages/sqlalchemy/orm/query.py", line 3560, in _execute_and_instances
result = conn.execute(querycontext.statement, self._params)
File "/home/ubu20/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1011, in execute
return meth(self, multiparams, params)
File "/home/ubu20/.local/lib/python3.8/site-packages/sqlalchemy/sql/elements.py", line 298, in _execute_on_connection
return connection._execute_clauseelement(self, multiparams, params)
File "/home/ubu20/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1124, in _execute_clauseelement
ret = self._execute_context(
File "/home/ubu20/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1316, in _execute_context
self._handle_dbapi_exception(
File "/home/ubu20/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1510, in _handle_dbapi_exception
util.raise_(
File "/home/ubu20/.local/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
raise exception
File "/home/ubu20/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1276, in _execute_context
self.dialect.do_execute(
File "/home/ubu20/.local/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 608, in do_execute
cursor.execute(statement, parameters)
sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) SSL error: sslv3 alert bad record mac
[SQL: SELECT task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, task_instance.execution_date AS task_instance_execution_date, task_instance.start_date AS task_instance_start_date, task_instance.end_date AS task_instance_end_date, task_instance.duration AS task_instance_duration, task_instance.state AS task_instance_state, task_instance.max_tries AS task_instance_max_tries, task_instance.hostname AS task_instance_hostname, task_instance.unixname AS task_instance_unixname, task_instance.job_id AS task_instance_job_id, task_instance.pool AS task_instance_pool, task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS task_instance_queue, task_instance.priority_weight AS task_instance_priority_weight, task_instance.operator AS task_instance_operator, task_instance.queued_dttm AS task_instance_queued_dttm, task_instance.queued_by_job_id AS task_instance_queued_by_job_id, task_instance.pid AS task_instance_pid, task_instance.executor_config AS task_instance_executor_config, task_instance.external_executor_id AS task_instance_external_executor_id
FROM task_instance
WHERE task_instance.dag_id = %(dag_id_1)s AND task_instance.execution_date = %(execution_date_1)s AND task_instance.task_id IN (%(task_id_1)s)]
[parameters: {'dag_id_1': 'parallel_test_dag', 'execution_date_1': datetime.datetime(2021, 9, 16, 17, 54, 38, tzinfo=Timezone('UTC')), 'task_id_1': 'test1_run'}]
(Background on this error at: http://sqlalche.me/e/13/e3q8)
当一个任务完成时我收到此错误,并且在上一个任务成功完成后它会尝试加载下一个任务
知道如何解决这个问题吗?