我正在使用高速公路连接到服务器并获取"push"
通知,并且我想使用他们的扭曲事件循环制作一个简单的 urwid 界面。但是我不确定从我的高速公路处理程序类中设置 urwid 文本的最佳方法是什么。在下面的代码中,您可以看到我想从我的类中调用该"updateText"
方法的当前实现。"MyFrontendComponent"
这样做的最佳方法是什么?
import urwid
from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks
from twisted.internet.endpoints import clientFromString
from autobahn.twisted import wamp, websocket
from autobahn.wamp import types
from autobahn.wamp.serializer import *
class MyFrontendComponent( wamp.ApplicationSession, object):
@inlineCallbacks
def onJoin(self, details):
## call a remote procedure
try:
now = yield self.call(u'com.timeservice.now')
except Exception as e:
print("Error: {}".format(e))
else:
print("Current time from time service: {}".format(now))
## subscribe to a topic
self.received = 0
def on_event(i):
print("Got event: {}".format(i))
self.received += 1
if self.received > 5:
self.leave()
sub = yield self.subscribe(on_event, u'com.myapp.topic1')
print("Subscribed with subscription ID {}".format(sub.id))
def onDisconnect(self):
reactor.stop()
class MyApp(object):
txt = urwid.Text(u"Hello World")
def __init__(self):
component_config = types.ComponentConfig(realm="realm1")
session_factory = wamp.ApplicationSessionFactory(config=component_config)
session_factory.session = MyFrontendComponent
serializers = None
serializers = []
serializers.append(JsonSerializer())
transport_factory = websocket.WampWebSocketClientFactory(session_factory,
serializers=serializers, debug=False, debug_wamp=False)
client = clientFromString(reactor, "tcp:127.0.0.1:8080")
client.connect(transport_factory)
fill = urwid.Filler(self.txt, 'top')
loop = urwid.MainLoop(fill, unhandled_input=self.show_or_exit, event_loop=urwid.TwistedEventLoop())
loop.run()
def updateText(self, input):
self.txt.set_text(input)
def show_or_exit(self, key):
if key in ('q', 'Q'):
raise urwid.ExitMainLoop()
self.txt.set_text(repr(key))
if __name__ == '__main__':
MyApp()
和服务器代码:
import sys
import six
import datetime
from twisted.python import log
from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks
from twisted.internet.endpoints import serverFromString
from autobahn.wamp import types
from autobahn.twisted.util import sleep
from autobahn.twisted import wamp, websocket
class MyBackendComponent(wamp.ApplicationSession):
@inlineCallbacks
def onJoin(self, details):
## register a procedure for remote calling
def utcnow():
print("Someone is calling me;)")
now = datetime.datetime.utcnow()
return six.u(now.strftime("%Y-%m-%dT%H:%M:%SZ"))
reg = yield self.register(utcnow, u'com.timeservice.now')
print("Registered procedure with ID {}".format(reg.id))
## publish events to a topic
counter = 0
while True:
self.publish(u'com.myapp.topic1', counter)
print("Published event.")
counter += 1
yield sleep(1)
if __name__ == '__main__':
## 0) start logging to console
log.startLogging(sys.stdout)
## 1) create a WAMP router factory
router_factory = wamp.RouterFactory()
## 2) create a WAMP router session factory
session_factory = wamp.RouterSessionFactory(router_factory)
## 3) Optionally, add embedded WAMP application sessions to the router
component_config = types.ComponentConfig(realm="realm1")
component_session = MyBackendComponent(component_config)
session_factory.add(component_session)
## 4) create a WAMP-over-WebSocket transport server factory
transport_factory = websocket.WampWebSocketServerFactory(session_factory,
debug=False,
debug_wamp=False)
## 5) start the server from a Twisted endpoint
server = serverFromString(reactor, "tcp:8080")
server.listen(transport_factory)
## 6) now enter the Twisted reactor loop
reactor.run()
谢谢!