8

也许我在 Twisted 的异步设计中遗漏了一些东西,但我似乎找不到“外部”调用 sendMessage() 方法的方法。我的意思是,发送消息而不仅仅是在 Twisted/AutobahnWebsockets 的回调方法中(例如在 onOpen 或在 onMessage() 从服务器接收数据时)

当然我可以启动一个线程并调用 my_protocol_instance.sendMessage("hello") 但这会破坏异步设计的所有目的,对吗?

在一个具体的例子中,我需要一个顶级包装类来打开连接并管理它,并且每当我需要时,我都会调用 my_class.send_my_toplevel_message(msg)。我该如何实施?

希望我的解释很清楚。

谢谢

4

2 回答 2

3

我设法通过在另一个线程中运行 Twisted 来实现我需要的东西,让我的程序可以自由运行并允许它使用 reactor.callFromThread() 在 Twisted 中触发发送数据。

你怎么看?

# ----- twisted ----------
class _WebSocketClientProtocol(WebSocketClientProtocol):
    def __init__(self, factory):
        self.factory = factory

    def onOpen(self):
        log.debug("Client connected")
        self.factory.protocol_instance = self
        self.factory.base_client._connected_event.set()

class _WebSocketClientFactory(WebSocketClientFactory):
    def __init__(self, *args, **kwargs):
        WebSocketClientFactory.__init__(self, *args, **kwargs)
        self.protocol_instance = None
        self.base_client = None

    def buildProtocol(self, addr):
        return _WebSocketClientProtocol(self)
# ------ end twisted -------

class BaseWBClient(object):

    def __init__(self, websocket_settings):
        self.settings = websocket_settings
        # instance to be set by the own factory
        self.factory = None
        # this event will be triggered on onOpen()
        self._connected_event = threading.Event()
        # queue to hold not yet dispatched messages
        self._send_queue = Queue.Queue()
        self._reactor_thread = None

    def connect(self):
        log.debug("Connecting to %(host)s:%(port)d" % self.settings)
        self.factory = _WebSocketClientFactory(
                                "ws://%(host)s:%(port)d" % self.settings,
                                debug=True)
        self.factory.base_client = self
        c = connectWS(self.factory)
        self._reactor_thread = threading.Thread(target=reactor.run,
                                               args=(False,))
        self._reactor_thread.daemon = True
        self._reactor_thread.start()

    def send_message(self, body):
        if not self._check_connection():
            return
        log.debug("Queing send")
        self._send_queue.put(body)
        reactor.callFromThread(self._dispatch)

    def _check_connection(self):
        if not self._connected_event.wait(timeout=10):
            log.error("Unable to connect to server")
            self.close()
            return False
        return True

    def _dispatch(self):
        log.debug("Dispatching")
        while True:
            try:
                body = self._send_queue.get(block=False)
            except Queue.Empty:
                break
            self.factory.protocol_instance.sendMessage(body)

    def close(self):
        reactor.callFromThread(reactor.stop)
于 2013-09-23T23:17:29.617 回答
3

为什么需要一个线程来启动 protocolInstance.sendMessage() ?这可以在正常的反应器回路中完成。

扭曲的核心是反应器,当您认为扭曲本身是反应性的时,它可以更容易地看待事物- 这意味着它作为对其他事物的反应(响应)来做某事。

现在我假设您正在谈论的线程也由于某些事件或活动或状态而在调用 sendMessage 时创建和创建。我很难想象这样一种情况,你只需要突然发送一条消息而没有任何反应。

但是,如果有一个事件应该触发 sendMessage,则无需在线程中调用它:只需使用扭曲的机制来捕获该事件,然后从该特定事件的回调中调用 sendMessage。

现在来看你的具体例子:你能在这个问题的上下文中具体说明“每当我需要”的含义吗?来自另一个连接的输入?来自用户的输入?循环活动?

于 2013-09-20T09:19:37.947 回答