4

在我尝试运行其中一个 dag 之后,当它产生此错误时,我正在尝试通过命令“气流调度程序”运行任务。

Traceback (most recent call last):
File "/usr/local/bin/airflow", line 28, in <module>
args.func(args)
File "/usr/local/lib/python3.5/dist-packages/airflow/bin/cli.py", line 839, in scheduler
job.run()
File "/usr/local/lib/python3.5/dist-packages/airflow/jobs.py", line 200, in run
self._execute()
File "/usr/local/lib/python3.5/dist-packages/airflow/jobs.py", line 1309, in _execute
self._execute_helper(processor_manager)
File "/usr/local/lib/python3.5/dist-packages/airflow/jobs.py", line 1441, in _execute_helper
self.executor.heartbeat()
File "/usr/local/lib/python3.5/dist-packages/airflow/executors/base_executor.py", line 132, in heartbeat
self.sync()
File "/usr/local/lib/python3.5/dist-packages/airflow/executors/celery_executor.py", line 88, in sync
state = async.state
File "/home/userName/.local/lib/python3.5/site-packages/celery/result.py", line 436, in state
return self._get_task_meta()['status']
File "/home/userName/.local/lib/python3.5/site-packages/celery/result.py", line 375, in _get_task_meta
return self._maybe_set_cache(self.backend.get_task_meta(self.id))
File "/home/userName/.local/lib/python3.5/site-packages/celery/backends/amqp.py", line 156, in get_task_meta
binding.declare()
File "/home/userName/.local/lib/python3.5/site-packages/kombu/entity.py", line 605, in declare
self._create_queue(nowait=nowait, channel=channel)
File "/home/userName/.local/lib/python3.5/site-packages/kombu/entity.py", line 614, in _create_queue
self.queue_declare(nowait=nowait, passive=False, channel=channel)
File "/home/userName/.local/lib/python3.5/site-packages/kombu/entity.py", line 649, in queue_declare
nowait=nowait,
 File "/home/userName/.local/lib/python3.5/site-packages/amqp/channel.py", line 1147, in queue_declare
nowait, arguments),
File "/home/userName/.local/lib/python3.5/site-packages/amqp/abstract_channel.py", line 50, in send_method
conn.frame_writer(1, self.channel_id, sig, args, content)
File "/home/userName/.local/lib/python3.5/site-packages/amqp/method_framing.py", line 166, in write_frame
write(view[:offset])
File "/home/userName/.local/lib/python3.5/site-packages/amqp/transport.py", line 258, in write
self._write(s)
**ConnectionResetError: [Errno 104] Connection reset by peer**

我正在使用 Python 3.5、Airflow 1.8、Celery 4.1.0 和 RabbitMQ 3.5.7 作为工作人员:看起来我在 RabbitMQ 上遇到了问题,但我无法弄清楚原因。

4

2 回答 2

0

有同样的问题。

您的 dag 包含对服务器的许多 API 调用,并且您的气流调度程序有一个限制,一次没有特定数量的请求要遵守,但您应该反复试验以找到适用于您的气流环境的数字. 通常发生在您的 dag 有 n 个要同时运行的任务时。

答案中声称的任何更新都无法解决此问题,即使我使用的是最新版本,我也会收到错误消息。

于 2021-09-01T08:48:25.900 回答
0

报告的错误似乎是在 Airflow 1.10.0 中解决的已识别错误。

于 2019-04-19T17:23:24.613 回答