4

我正在尝试同时autobahn.asyncio.wamp.ApplicationSession在 python 中运行两个 s。以前,我按照本文的答案中的建议使用了高速公路库的修改来做到这一点。我现在需要更专业的解决方案。

在谷歌上搜索了一段时间后,这篇文章看起来很有希望,但使用的是twisted库,而不是asyncio. 我无法为图书馆的asyncio分支找到类似的解决方案autobahn,因为它似乎没有使用Reactors.

我遇到的主要问题ApplicationRunner.run()是阻塞(这就是我之前将它外包给线程的原因),所以我不能只ApplicationRunner在它之后运行一秒钟。

我确实需要同时访问 2 个 websocket 通道,而我似乎无法使用单个ApplicationSession.

到目前为止我的代码:

from autobahn.asyncio.wamp import ApplicationSession
from autobahn.asyncio.wamp import ApplicationRunner
from asyncio import coroutine
import time


channel1 = 'BTC_LTC'
channel2 = 'BTC_XMR'

class LTCComponent(ApplicationSession):

    def onConnect(self):
        self.join(self.config.realm)

    @coroutine
    def onJoin(self, details):
        def onTicker(*args, **kwargs):
            print('LTCComponent', args, kwargs)

        try:
            yield from self.subscribe(onTicker, channel1)
        except Exception as e:
            print("Could not subscribe to topic:", e)

class XMRComponent(ApplicationSession):

    def onConnect(self):
        self.join(self.config.realm)

    @coroutine
    def onJoin(self, details):
        def onTicker(*args, **kwargs):
            print('XMRComponent', args, kwargs)

        try:
            yield from self.subscribe(onTicker, channel2)
        except Exception as e:
            print("Could not subscribe to topic:", e)

def main():
    runner = ApplicationRunner("wss://api.poloniex.com:443", "realm1", extra={})
    runner.run(LTCComponent)
    runner.run(XMRComponent) # <- is not being called


if __name__ == "__main__":

    try:
        main()
    except KeyboardInterrupt:
        quit()

    except Exception as e:
        print(time.time(), e)

我对autobahn图书馆的了解有限,恐怕文档并没有太大改善我的情况。我在这里忽略了什么吗?一个函数,一个参数,它可以让我组合我的组件或同时运行它们?

也许与此处提供的类似解决方案实现了替代方案ApplicationRunner


相关话题

以扭曲的方式运行两个 ApplicationSession

在线程中运行 Autobahn ApplicationRunner

Autobahn.wamp.ApplicationSession 源

Autobahn.wamp.Applicationrunner 源代码


根据要求,来自@stovfl 的回溯使用multithreading代码回答:

Exception in thread Thread-2:
Traceback (most recent call last):
  File "/home/nils/anaconda3/lib/python3.5/threading.py", line     914, in _bootstrap_inner
    self.run()
  File "/home/nils/git/tools/gemini_wss/t2.py", line 27, in run
    self.appRunner.run(self.__ApplicationSession)
  File "/home/nils/anaconda3/lib/python3.5/site-packages/autobahn-    0.14.1-py3.5.egg/autobahn/asyncio/wamp.py", line 143,     in run
    transport_factory = WampWebSocketClientFactory(create,         url=self.url,                 serializers=self.serializers)
  File "/home/nils/anaconda3/lib/python3.5/site-packages/autobahn-    0.14.1-py3.5.egg/autobahn/asyncio/websocket.py", line     319, in __init__
    WebSocketClientFactory.__init__(self, *args, **kwargs)
  File "/home/nils/anaconda3/lib/python3.5/site-packages/autobahn-    0.14.1-py3.5.egg/autobahn/asyncio/websocket.py", line     268, in __init__
    self.loop = loop or asyncio.get_event_loop()
  File "/home/nils/anaconda3/lib/python3.5/asyncio/events.py",     line 626, in get_event_loop
    return get_event_loop_policy().get_event_loop()
  File "/home/nils/anaconda3/lib/python3.5/asyncio/events.py",     line 572, in get_event_loop
    % threading.current_thread().name)
RuntimeError: There is no current event loop in thread 'Thread-2'.
Exception in thread Thread-1:
**Same as in Thread-2**
...
RuntimeError: There is no current event loop in thread 'Thread-1'.
4

2 回答 2

2

正如我从 中看到的traceback,我们只达到第 2 步,共 4 步

来自asyncio文档:
该模块提供了使用协程编写单线程并发代码的基础架构,通过套接字和其他资源多路复用 I/O 访问

所以我放弃了我的第一个建议,使用multithreading.
我可以想象以下三个选项:

  1. multiprocessing代替multithreading
  2. coroutine里面做asyncio loop
  3. channels在之间切换def onJoin(self, details)

第二个建议,第一个选项使用multiprocessing.
我可以开始两个asyncio loops,所以appRunner.run(...)应该工作。

如果唯一不同,您可以使用其中一个。 如果您需要通过不同的添加到class ApplicationSessionchannelclass ApplicationSessionargs=

class __ApplicationSession(ApplicationSession):
        # ...
        try:
            yield from self.subscribe(onTicker, self.config.extra['channel'])
        except Exception as e:
            # ...

import multiprocessing as mp
import time

def ApplicationRunner_process(realm, channel):
        appRunner = ApplicationRunner("wss://api.poloniex.com:443", realm, extra={'channel': channel})
        appRunner.run(__ApplicationSession)

if __name__ == "__main__":
    AppRun = [{'process':None, 'channel':'BTC_LTC'},
              {'process': None, 'channel': 'BTC_XMR'}]

    for app in AppRun:
        app['process'] =  mp.Process(target = ApplicationRunner_process, args = ('realm1', app['channel'] ))
        app['process'].start()
        time.sleep(0.1)

    AppRun[0]['process'].join()
    AppRun[1]['process'].join()
于 2017-03-14T17:19:00.180 回答
1

按照您为 twisted 链接的方法,我设法通过 asyncio 设置start_loop=False获得相同的行为

import asyncio
from autobahn.asyncio.wamp import ApplicationSession, ApplicationRunner

runner1 = ApplicationRunner(url, realm, extra={'cli_id': 1})
coro1 = runner1.run(MyApplicationSession, start_loop=False)

runner2 = ApplicationRunner(url, realm, extra={'cli_id': 2})
coro2 = runner2.run(MyApplicationSession, start_loop=False)

asyncio.get_event_loop().run_until_complete(coro1)
asyncio.get_event_loop().run_until_complete(coro2)
asyncio.get_event_loop().run_forever()

class MyApplicationSession(ApplicationSession):

    def __init__(self, cfg):
        super().__init__(cfg)
        self.cli_id = cfg.extra['cli_id']

   def onJoin(self, details):
        print("session attached", self.cli_id)
于 2019-03-15T09:58:43.667 回答