我曾经在 Java 中使用 Netty,我真的很喜欢它的概念和做事的方式。现在,我正在开发一个 Python 项目,我需要设计一个能够在许多不同的传输层上异步执行 IO 操作的 API。
我决定创建类似于 Netty 的东西。当然,我知道 Netty 是一个非常大的项目,我永远无法模仿它的出色功能,但我只想在 Python 的 asyncio 之上实现它的一些基础知识。
这是我到目前为止得到的:
from .utility import Next
__all__ = [
"ChannelHandler", "InboundHandler", "OutboundHandler",
"Pipeline", "Channel"
]
class ChannelHandler:
async def connected(self, channel):
raise NotImplementedError()
async def raised(self, channel, exception):
raise NotImplementedError()
async def disconnected(self, channel):
raise NotImplementedError()
class InboundHandler:
async def received(self, channel, data, next):
raise NotImplementedError()
class OutboundHandler:
async def sent(self, channel, message, next):
raise NotImplementedError()
class Pipeline:
def __init__(self, channel, channel_handler, inbound_handlers, outbound_handlers):
self.channel = channel
self.channel_handler = channel_handler
self.inbound_handlers = inbound_handlers
self.outbound_handlers = outbound_handlers
async def handle_raise_event(self, exception):
await self.channel_handler.raised(channel, exception)
async def handle_receive_event(self, data):
await Next(self.inbound_handlers).next(self.channel, data)
async def handle_send_event(self, message):
await Next(self.outbound_handlers).next(self.channel, message)
class Channel:
def __init__(self, url, channel_handler, inbound_handlers, outbound_handlers, options={}):
self.url = url
self.pipeline = Pipeline(self, channel_handler, inbound_handlers, outbound_handlers)
self.options = options
async def connect(self):
pass
async def send(self, message):
pass
async def disconnect(self):
pass
async def connect(url, channel_handler, inbound_handlers, outbound_handlers, options={}):
pass
这是核心 API 模块。这是一个应该如何使用的例子:
import asyncio
import transport
class DummyChannelHandler:
async def connected(self, channel):
print("Connected.")
async def raised(self, channel, exception):
print("Error:", exception)
await channel.disconnect()
async def disconnected(self, channel):
print("Disconected.")
class DummyInboundHandler:
async def received(self, channel, data, next):
print("Received:", data)
class DummyOutboundHandler:
async def sent(self, channel, message, next):
await channel.write(message.encode())
async def main():
channel = await transport.connect(
"udp://192.168.20.244:55300",
DummyChannelHandler(),
[DummyInboundHandler()],
[DummyOutboundHandler()]
)
await channel.send("Hi!")
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
正如我所说,我对 asyncio 很陌生。我想问问你的意见,我真的很感激一些建议。就像,这是asyncio
应该使用的方式吗?还是我完全错过了重点?这可能行得通吗?还有一些同步问题呢?我知道锁、队列之类的东西,但我不清楚如何将它们与asyncio
.
我不知道 API 是否设计正确。例如,我不太确定关键字async
是否在正确的地方使用。也一样await
。
如果你碰巧发现了什么,请给我留言,谢谢!