在我尝试运行其中一个 dag 之后,当它产生此错误时,我正在尝试通过命令“气流调度程序”运行任务。
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 28, in <module>
args.func(args)
File "/usr/local/lib/python3.5/dist-packages/airflow/bin/cli.py", line 839, in scheduler
job.run()
File "/usr/local/lib/python3.5/dist-packages/airflow/jobs.py", line 200, in run
self._execute()
File "/usr/local/lib/python3.5/dist-packages/airflow/jobs.py", line 1309, in _execute
self._execute_helper(processor_manager)
File "/usr/local/lib/python3.5/dist-packages/airflow/jobs.py", line 1441, in _execute_helper
self.executor.heartbeat()
File "/usr/local/lib/python3.5/dist-packages/airflow/executors/base_executor.py", line 132, in heartbeat
self.sync()
File "/usr/local/lib/python3.5/dist-packages/airflow/executors/celery_executor.py", line 88, in sync
state = async.state
File "/home/userName/.local/lib/python3.5/site-packages/celery/result.py", line 436, in state
return self._get_task_meta()['status']
File "/home/userName/.local/lib/python3.5/site-packages/celery/result.py", line 375, in _get_task_meta
return self._maybe_set_cache(self.backend.get_task_meta(self.id))
File "/home/userName/.local/lib/python3.5/site-packages/celery/backends/amqp.py", line 156, in get_task_meta
binding.declare()
File "/home/userName/.local/lib/python3.5/site-packages/kombu/entity.py", line 605, in declare
self._create_queue(nowait=nowait, channel=channel)
File "/home/userName/.local/lib/python3.5/site-packages/kombu/entity.py", line 614, in _create_queue
self.queue_declare(nowait=nowait, passive=False, channel=channel)
File "/home/userName/.local/lib/python3.5/site-packages/kombu/entity.py", line 649, in queue_declare
nowait=nowait,
File "/home/userName/.local/lib/python3.5/site-packages/amqp/channel.py", line 1147, in queue_declare
nowait, arguments),
File "/home/userName/.local/lib/python3.5/site-packages/amqp/abstract_channel.py", line 50, in send_method
conn.frame_writer(1, self.channel_id, sig, args, content)
File "/home/userName/.local/lib/python3.5/site-packages/amqp/method_framing.py", line 166, in write_frame
write(view[:offset])
File "/home/userName/.local/lib/python3.5/site-packages/amqp/transport.py", line 258, in write
self._write(s)
**ConnectionResetError: [Errno 104] Connection reset by peer**
我正在使用 Python 3.5、Airflow 1.8、Celery 4.1.0 和 RabbitMQ 3.5.7 作为工作人员:看起来我在 RabbitMQ 上遇到了问题,但我无法弄清楚原因。