以下是两种可能的解决方案:
1) 定期轮询您的本地应用程序以查看您是否有其他数据要发送。
注意。这依赖于twisted 中deferLater 方法的周期性异步回调。如果您需要一个按需发送数据的响应式应用程序,或者一个长时间运行的阻塞操作(例如,使用自己的事件循环的 ui),那么它可能不合适。
代码:
from twisted.internet.protocol import Factory
from twisted.internet.endpoints import TCP4ServerEndpoint
from twisted.internet.interfaces import IPushProducer
from twisted.internet.task import deferLater, cooperate
from twisted.internet.protocol import Protocol
from twisted.internet import reactor
from zope.interface import implementer
import time
# Deferred action
def periodically_poll_for_push_actions_async(reactor, protocol):
while True:
protocol.send(b"Hello World\n")
yield deferLater(reactor, 2, lambda: None)
# Push protocol
@implementer(IPushProducer)
class PushProtocol(Protocol):
def connectionMade(self):
self.transport.registerProducer(self, True)
gen = periodically_poll_for_push_actions_async(self.transport.reactor, self)
self.task = cooperate(gen)
def dataReceived(self, data):
self.transport.write(data)
def send(self, data):
self.transport.write(data)
def pauseProducing(self):
print 'Workload paused'
self.task.pause()
def resumeProducing(self):
print 'Workload resumed'
self.task.resume()
def stopProducing(self):
print 'Workload stopped'
self.task.stop()
def connectionLost(self, reason):
print 'Connection lost'
try:
self.task.stop()
except:
pass
# Push factory
class PushFactory(Factory):
def buildProtocol(self, addr):
return PushProtocol()
# Run the reactor that serves everything
endpoint = TCP4ServerEndpoint(reactor, 8089)
endpoint.listen(PushFactory())
reactor.run()
2) 手动跟踪协议实例并使用来自不同线程的 reactor.callFromThread()。让您摆脱另一个线程中的长时间阻塞操作(例如 ui 事件循环)。
代码:
from twisted.internet.protocol import Factory
from twisted.internet.endpoints import TCP4ServerEndpoint
from twisted.internet.interfaces import IPushProducer
from twisted.internet.task import deferLater, cooperate
from twisted.internet.protocol import Protocol
from twisted.internet import reactor, threads
import time
import random
import threading
# Connection
protocol = None
# Some other thread that does whatever it likes.
class SomeThread(threading.Thread):
def run(self):
while True:
print("Thread loop")
time.sleep(random.randint(0, 4))
if protocol is not None:
reactor.callFromThread(self.dispatch)
def dispatch(self):
global protocol
protocol.send("Hello World\n")
# Push protocol
class PushProtocol(Protocol):
def connectionMade(self):
global protocol
protocol = self
def dataReceived(self, data):
self.transport.write(data)
def send(self, data):
self.transport.write(data)
def connectionLost(self, reason):
print 'Connection lost'
# Push factory
class PushFactory(Factory):
def buildProtocol(self, addr):
return PushProtocol()
# Start thread
other = SomeThread()
other.start()
# Run the reactor that serves everything
endpoint = TCP4ServerEndpoint(reactor, 8089)
endpoint.listen(PushFactory())
reactor.run()
就个人而言,我发现 IPushProducer 和 IPullProducer 需要定期回调,这使得它们不太有用。其他人不同意……耸耸肩。任你选。