我正在尝试将aiohttp
Web 服务器集成到 Crossbar+Autobahn 系统架构中。
更详细地说,当aiohttp
服务器接收到某个 API 调用时,它必须向 Crossbar 路由器发布消息。我在官方 repos 上看到过这个例子,但我不知道如何将它集成到我的应用程序中。
理想情况下,我希望能够做到这一点
# class SampleTaskController(object):
async def handle_get_request(self, request: web.Request) -> web.Response:
self.publisher.publish('com.myapp.topic1', 'Hello World!')
return web.HTTPOk()
其中self
il 的一个实例SampleTaskController(object)
定义了 Web 服务器的所有路由处理程序。
def main(argv):
cfg_path = "./task_cfg.json"
if len(argv) > 1:
cfg_path = argv[0]
logging.basicConfig(level=logging.DEBUG,
format=LOG_FORMAT)
loop = zmq.asyncio.ZMQEventLoop()
asyncio.set_event_loop(loop)
app = web.Application(loop=loop)
with open(cfg_path, 'r') as f:
task_cfg = json.load(f)
task_cfg['__cfg_path'] = cfg_path
controller = SampleTaskController(task_cfg)
controller.restore()
app['controller'] = controller
controller.setup_routes(app)
app.on_startup.append(controller.on_startup)
app.on_cleanup.append(controller.on_cleanup)
web.run_app(app,
host=task_cfg['webserver_address'],
port=task_cfg['webserver_port'])
请注意,我使用的是一个,zmq.asyncio.ZMQEventLoop
因为服务器也在侦听在方法zmq
内部配置的套接字controller.on_startup
。
我没有使用高速公路,而是尝试使用 Crossbar 将消息发布到 Crossbarwampy
并且它可以工作,但是高速公路订阅者无法正确解析消息。
# autobahn subscriber
class ClientSession(ApplicationSession):
async def onJoin(self, details):
self.log.info("Client session joined {details}", details=details)
self.log.info("Connected: {details}", details=details)
self._ident = details.authid
self._type = u'Python'
self.log.info("Component ID is {ident}", ident=self._ident)
self.log.info("Component type is {type}", type=self._type)
# SUBSCRIBE
def gen_on_something(thing):
def on_something(counter, id, type):
print('----------------------------')
self.log.info("'on_{something}' event, counter value: {message}",something=thing, message=counter)
self.log.info("from component {id} ({type})", id=id, type=type)
return on_something
await self.subscribe(gen_on_something('landscape'), 'landscape')
await self.subscribe(gen_on_something('nature'), 'nature')
-
# wampy publisher
async def publish():
router = Crossbar(config_path='./crossbar.json')
logging.getLogger().debug(router.realm)
logging.getLogger().debug(router.url)
logging.getLogger().debug(router.port)
client = Client(router=router)
client.start()
result = client.publish(topic="nature", message=0)
logging.getLogger().debug(result)
使用此配置,订阅者会收到已发布的消息,但在解析时会出现异常。
TypeError: on_something() got an unexpected keyword argument 'message'