0

我正在使用其 API 使用 threading.Thread 的服务,并且我想在其中使用 Deferreds。

如果我像标准 python 模块一样运行它,我没有任何问题。就像是:

from twisted.internet import reactor
from outside import ServiceUsingThreadingAndDeferred

service = ServiceUsingThreadingAndDeferred()

reactor.listenTCP(port, protocol_factory)

service.start()

reactor.run()

如果我使用 twistd -y运行以下 .tac,则该服务根本不起作用:

from twisted.application import internet, service
from outside import ServiceUsingThreadingAndDeferred

service = ServiceUsingThreadingAndDeferred()

# Application set-up
application = service.Application("appName")
my_server = internet.TCPServer(port, protocol_factory)
my_server.setServiceParent(application)

service.start()

我认为第二种情况的问题是主反应器线程本身没有产生服务线程,但是我不明白为什么(以及是否)它在第一种情况下发生......我使用 callLater 作为解决方法 -成功地:

from twisted.application import internet, service
from outside import ServiceUsingThreadingAndDeferred
from twisted.internet import reactor

service = ServiceUsingThreadingAndDeferred()

# Application set-up
application = service.Application("appName")
my_server = internet.TCPServer(port, protocol_factory)
my_server.setServiceParent(application)

reactor.callLater(1, service.start)

但我不知道这是否是解决这个问题的正确方法。你有什么建议吗?

4

1 回答 1

0

从 Github 存储库中,这个类滥用了 Twisted 的线程 API:

class ServiceUsingThreadingAndDeferred():
    def __init__(self):
        pass

    def start(self):
        print "3rd party API service starting..."
        self.run_as_thread()

    def run_as_thread(self, *args, **kwargs):
        t = threading.Thread(target=self.run_forever, args=args, kwargs=kwargs)
        t.daemon = True
        t.start()

    def run_forever(self):
        while 1:
            print "Doing something remote..."
            time.sleep(1)
            now = time.time()
            if 1 > now % 5 >= 0:
                self.defer_activity()

    def defer_activity(self):
        threads.deferToThread(self._activity)

ServiceUsingThreadingAndDeferred.run_forever在非反应器线程中运行。它调用defer_activitywhich 调用threads.deferToThread。不允许调用threads.deferToThread非反应器线程。大约有一个 Twisted API 可以安全地在非反应器线程中reactor.callFromThread调用:(它调度对其参数的调用以在反应器线程中运行)。

working.tac做同样的事情但很幸运,因此似乎可以在某些版本的 Twisted 上工作。它依赖于调用threads.deferToThread与实现方式交互的非反应器线程所产生的未定义行为callLater。不能保证它完全有效,或者该行为可以跨 Twisted 版本或平台移植。

如果要从非反应器线程中使用反应器的线程池,则需要编写如下内容:

from twisted.internet.threads import (
    blockingCallFromThread,
    deferToThread,
)

d = blockingCallFromThread(reactor, lambda: deferToThread(self._activity))

但是,您也不能在非反应器线程中使用dDeferred返回的)的任何方法。deferToThread

最有可能的是,如果可能的话,您应该重写逻辑ServiceUsingThreadingAndDeferred以使其与反应器兼容,然后您就可以避免所有这些恶作剧。

于 2017-11-04T22:23:08.207 回答