在套接字方面,我是一个完整的初学者,所以如果这个问题对你来说太微不足道,请多多包涵。以下是我在 GitLab 上找到的代码,我正在尝试理解
import os
import logging
import uuid
import socketio
from aiohttp import web
import sys
sys.path.append('.')
logging.basicConfig(level=logging.WARN,
format='%(asctime)s %(name)-12s %(levelname)-8s %(message)s',
datefmt='%m-%d %H:%M')
# Home page
async def index(request):
index_file = open('examples/rasa_demo/templates/index.html')
return web.Response(body=index_file.read().encode('utf-8'), headers={'content-type': 'text/html'})
# Action endpoint
async def webhook(request):
"""Webhook to retrieve action calls."""
action_call = await request.json()
try:
response = await executor.run(action_call)
except ActionExecutionRejection as e:
logger.error(str(e))
response = {"error": str(e), "action_name": e.action_name}
response.status_code = 400
return response
return web.json_response(response)
# Web app routing
app = web.Application()
app.add_routes([
web.get('/', index),
web.post('/webhook', webhook),
web.static('/static', 'examples/rasa_demo/static')
])
# Instantiate all bot agents
bots = BotFactory.createAll()
# Websocket through SocketIO with support for regular HTTP endpoints
sio = socketio.AsyncServer(async_mode='aiohttp', cors_allowed_origins='*')
sio.attach(app)
@sio.on('session_request')
async def on_session_request(sid, data):
if data is None:
data = {}
if 'session_id' not in data or data['session_id'] is None:
data['session_id'] = uuid.uuid4().hex
await sio.emit('session_confirm', data['session_id'])
@sio.on('user_uttered')
async def on_user_uttered(sid, message):
custom_data = message.get('customData', {})
lang = custom_data.get('lang', 'en')
user_message = message.get('message', '')
bot_responses = await bots[lang].handle_text(user_message) #await BotFactory.getOrCreate(lang).handle_text(user_message)
for bot_response in bot_responses:
json = __parse_bot_response(bot_response)
await sio.emit('bot_uttered', json, room=sid)
我想了解的是事件处理程序如何在从未发出时捕获或像“session_request”或“user_uttered”这样的事件。谢谢你。