我正在尝试在 celery 任务中调用带有 nameko 的 rpc 函数,当我执行此操作时,它没有延迟就可以工作,但是随着延迟我得到一个错误
@rpc
def apply_rpc_async(self, **kwargs):
print(kwargs)
print("calling background task")
self.remote_task.delay(kwargs)
print("called background task")
@app.task
def remote_task(self, **kwargs):
with ServiceRpcProxy('remote_service', config={
"AMQP_URI": f'amqp://guest:guest@{rabbitmq_host}:5672//'}) as rpc_proxy:
print(f'executed')
return getattr(rpc_proxy, 'bg_add')(kwargs)
[2021-11-02 15:03:15,440: ERROR/MainProcess] 任务 background_nameko.services.remote_task[4cd31b92-9140-4265-8e03-e9b9e57552cd] 引发意外:ConnectionRefusedError(111, 'ECONNREFUSED') Traceback(最近一次调用最后):文件“/python3.7/site-packages/amqp/transport.py”,第 138 行,在 _connect 主机、端口、系列、socket.SOCK_STREAM、SOL_TCP)文件“lib/python3.7/site-packages/eventlet /support/greendns.py",第 539 行,在 getaddrinfo socktype、proto、aiflags) 文件 "/usr/lib/python3.7/socket.py",第 752 行,在 getaddrinfo 中用于 _socket.getaddrinfo(host, port , family, type, proto, flags): socket.gaierror: [Errno -9] Address family for hostname not supported
在处理上述异常的过程中,又出现了一个异常:
回溯(最后一次调用):文件“lib/python3.7/site-packages/celery/app/trace.py”,第 412 行,在 trace_task R = retval = fun(*args, **kwargs) 文件“lib /python3.7/site-packages/celery/app/trace.py”,第 704 行,在protected_call中 return self.run(*args, **kwargs) 文件“/background_nameko/services.py”,第 36 行,在 remote_task “AMQP_URI”:'amqp://guest:guest@172.21.0.6:5672//'})作为 rpc_proxy:文件“lib/python3.7/site-packages/nameko/standalone/rpc.py”,第 252 行,在进入 返回 self.start() 文件“/lib/python3.7/site-packages/nameko/standalone/rpc.py”,第 258 行,在 start self.reply_listener.start() 文件“lib/python3.7/site- packages/nameko/standalone/rpc.py”,第 87 行,开始可能是_declare(self.queue, conn.channel()) 文件“lib/python3.7/site-packages/kombu/connection.py”,第 289 行,在通道 chan = self.transport.create_channel(self.connection) 文件“lib/python3.7/site-packages/kombu/connection.py”,第 867 行,连接 max_retries=1,reraise_as_library_errors=False 文件“lib/python3 .7/site-packages/kombu/connection.py”,第 445 行,在 _ensure_connection 回调中,timeout=timeout 文件“lib/python3.7/site-packages/kombu/utils/functional.py”,第 344 行,在 retry_over_time返回 fun(*args, **kwargs) 文件“lib/python3.7/site-packages/kombu/connection.py”,第 874 行,在 _connection_factory self._connection = self._establish_connection() 文件“lib/python3.7/site-packages/kombu/connection.py”中,第 809 行_establish_connection conn = self.transport.establish_connection() 文件“lib/python3.7/site-packages/kombu/transport/pyamqp.py”,第 130 行,在建立连接 conn.connect() 文件“lib/python3.7/site -packages/amqp/connection.py”,第 314 行,连接 self.transport.connect() 文件“lib/python3.7/site-packages/amqp/transport.py”,第 78 行,连接 self._connect( self.host、self.port、self.connect_timeout)文件“lib/python3.7/site-packages/amqp/transport.py”,第 149 行,_connect“无法解析代理主机名”))文件“lib/python3 .7/site-packages/amqp/transport.py",第 162 行,在 _connect self.sock.connect(sa) 文件“lib/python3.7/site-packages/eventlet/greenio/base.py”中,第 267 行,在连接 socket_checkerr(fd) 文件“lib/python3.7 /site-packages/eventlet/greenio/base.py",第 51 行,在 socket_checkerr 中引发 socket.error(err, errno.errorcode[err]) ConnectionRefusedError: [Errno 111] ECONNREFUSED