任何人都可以将一个完整的新手指向我可以了解 Python 框架以测试 socket.io 的地方。我非常擅长编写用于测试静态 API 的脚本,但之前从未使用过 WebSocket。
谢谢你的帮助!
在此处的 gevent-socketIO 示例代码中有大量使用多个流行框架的示例https://github.com/abourget/gevent-socketio/tree/master/examples
如果你想使用 Python 与 socket.io 服务器通信,你可以使用socketIO-client。
from socketIO_client import SocketIO
def on_bbb_response(*args):
print 'on_bbb_response', args
with SocketIO('localhost', 8000) as socketIO:
socketIO.emit('bbb', {'xxx': 'yyy'}, on_bbb_response)
socketIO.wait_for_callbacks(seconds=1)
这是我尝试过的。
服务器.py
import json
from aiohttp import web
import socketio
sio = socketio.AsyncServer()
# Creates a new Aiohttp Web Application
app = web.Application()
sio.attach(app)
@sio.on('message')
async def print_message(sid, data):
print("worked :")
if data['type'] == "enter":
print("----- "+data['username'] + " joined the room -----")
elif data['type'] == "exit":
print("----- "+data['username'] + " left the room -----")
else:
print(data['username']+" > "+data['message'])
if __name__ == '__main__':
web.run_app(app)
客户端.py
import time
import socketio
sio = socketio.Client()
sio.connect('http://localhost:8080')
username = input("Enter username : ") or "Unknown"
sio.emit("message", {'username':username,'message':'','type':'enter'})
while True:
print("Enter message : ")
msg = input()
sio.emit("message", {'username':username,'message':msg,'type':'chat'})
print("to quit press y")
key = input()
if key == 'y' or key == 'Y':
sio.emit("message", {'username': username, 'message': "",'type':'exit'})
time.sleep(1)
sio.disconnect()
break
sio.emit("message", {'username': username, 'message': msg})
print("you disconnected")