我正在python3中学习asyncio,我写了一个简单的RPC服务器和客户端来学习,但是当我用asyncio.run_until_complete测试它时,它在future已经设置后阻塞,代码如下,检查主要部分。我正在使用 python 3.4.2
import asyncio
import struct
_req_struct = struct.Struct('<4i')
_req_size = _req_struct.size
_resp_struct = struct.Struct('<3i')
_resp_size = _resp_struct.size
class SimpleRPCClient(asyncio.Protocol):
_transport = None
def __init__(self):
self._pendings = {}
self.seq_id = 0
self._cached = bytearray()
def connection_made(self, transport):
"""
First event, create ref to transport
"""
self._transport = transport
def send(self, data):
"""
proxy to transport.write
"""
self._transport.write(data)
def data_received(self, data):
c = self._cached
c.extend(data)
cursor = 0
while len(self._cached) >= (cursor + _resp_size):
rid, status, result = _resp_struct.unpack(
c[cursor:cursor + _resp_size])
if rid in self._pendings:
f = self._pendings.pop(rid)
f.set_result(result)
print("future result: ", f.result())
def calc(self, oprand, op1, op2):
rid = self.seq_id
self.seq_id += 1
future = asyncio.Future()
self._pendings[rid] = future
self.send(_req_struct.pack(rid, oprand, op1, op2))
return future
if __name__ == '__main__':
loop = asyncio.get_event_loop()
host = "127.0.0.1"
port = 8087
_tr, client = loop.run_until_complete(loop.create_connection(
SimpleRPCClient,
host,
port))
f = client.calc(1, 3, 5)
loop.run_until_complete(f)
print("final result: ", f.result())
功能是计算 3+5,当我运行程序时,它会在 data_received 中正确显示结果,如
未来结果:8
但是之后程序阻塞,从不显示最终结果,future设置成功但函数不返回,这是为什么呢?
如果您想运行它,服务器端附在下面
import asyncio
import struct
_req_struct = struct.Struct('<4i')
_req_size = _req_struct.size
_resp_struct = struct.Struct('<3i')
_resp_size = _resp_struct.size
class SimpleRPCServer(object):
def build_protocol(self):
return SimpleRPCConnection()
class SimpleRPCConnection(asyncio.Protocol):
_transport = None
def __init__(self):
self._cached = bytearray()
def connection_made(self, transport):
"""
First event, create ref to transport
"""
self._transport = transport
def send(self, data):
"""
proxy to transport.write
"""
self._transport.write(data)
def data_received(self, data):
c = self._cached
c.extend(data)
cursor = 0
while len(self._cached) >= (cursor + _req_size):
seq_id, oprand, op1, op2 = _req_struct.unpack(
c[cursor:cursor + _req_size])
self.respond(seq_id, oprand, op1, op2)
cursor += _req_size
if cursor > len(self._cached):
self._cached.clear()
else:
self._cached[:cursor] = b''
def respond(self, seq_id, oprand, op1, op2):
if oprand == 1:
self.send(_resp_struct.pack(seq_id, 0, op1 + op2))
elif oprand == 2:
self.send(_resp_struct.pack(seq_id, 0, op1 - op2))
elif oprand == 3:
self.send(_resp_struct.pack(seq_id, 0, op1 * op2))
elif oprand == 4:
try:
self.send(_resp_struct.pack(seq_id, 0, op1 / op2))
except ZeroDivisionError:
self.send(_resp_struct.pack(seq_id, 1, 0))
else:
self.send(_resp_struct.pack(seq_id, 0xff, 0))
if __name__ == '__main__':
loop = asyncio.get_event_loop()
server = SimpleRPCServer()
loop.run_until_complete(loop.create_server(
server.build_protocol,
"0.0.0.0",
8087))
loop.run_forever()