2

我想使用 Twisted 从队列传输数据。我目前使用推送生产者来轮询队列中的项目并写入传输。

class Producer:

    implements(interfaces.IPushProducer)

    def __init__(self, protocol, queue):
        self.queue = queue
        self.protocol = protocol

    def resumeProducing(self):
        self.paused = False
        while not self.paused:
            try:
                data = self.queue.get_nowait()
                logger.debug("Transmitting: '%s'", repr(data))
                data = cPickle.dumps(data)
                self.protocol.transport.write(data + "\r\n")
            except Empty:
                pass

    def pauseProducing(self):
        logger.debug("Transmitter paused.")
        self.paused = True

    def stopProducing(self):
        pass

问题是,数据的发送非常不规则,如果队列中只有一项,则永远不会发送数据。似乎 Twisted 一直等到要传输的数据增长到特定值,直到它传输它。我实施制片人的方式是否正确?我现在可以强制 Twisted 传输数据

我也尝试过使用 pull producer,但 Twisted 根本不调用resumeProducing()它的方法。resumeProducer()使用拉生产者时,我是否必须从外部调用该方法?

4

2 回答 2

2

如果没有看到完整的示例(也就是说,没有看到向消费者注册它的代码以及将项目放入该队列的代码),很难说为什么你的生产者不能很好地工作。

但是,您可能会遇到的一个问题是,如果您的队列在被调用时为resumeProducing,那么您根本不会向消费者写入任何字节。当项目被放入队列时,它们将永远坐在那里,因为消费者不会resumeProducing再次调用您的方法。

这可以推广到队列中没有足够数据导致消费者调用pauseProducing您的生产者的任何其他情况。作为推送生产者,您的工作是继续自己生产数据,直到消费者调用pauseProducing(或stopProducing)。

对于这种特殊情况,这可能意味着无论何时您要将某些内容放入该队列 - 停止:检查生产者是否没有暂停,如果没有,则将其写入消费者。仅在生产者暂停时将项目放入队列。

于 2010-09-18T23:06:31.693 回答
-1

以下是两种可能的解决方案:

1) 定期轮询您的本地应用程序以查看您是否有其他数据要发送。

注意。这依赖于twisted 中deferLater 方法的周期性异步回调。如果您需要一个按需发送数据的响应式应用程序,或者一个长时间运行的阻塞操作(例如,使用自己的事件循环的 ui),那么它可能不合适。

代码:

from twisted.internet.protocol import Factory
from twisted.internet.endpoints import TCP4ServerEndpoint
from twisted.internet.interfaces import IPushProducer
from twisted.internet.task import deferLater, cooperate
from twisted.internet.protocol import Protocol
from twisted.internet import reactor
from zope.interface import implementer
import time

# Deferred action
def periodically_poll_for_push_actions_async(reactor, protocol):
  while True:
    protocol.send(b"Hello World\n")
    yield deferLater(reactor, 2, lambda: None)

# Push protocol
@implementer(IPushProducer)
class PushProtocol(Protocol):

   def connectionMade(self):
     self.transport.registerProducer(self, True)
     gen = periodically_poll_for_push_actions_async(self.transport.reactor, self)
     self.task = cooperate(gen)

   def dataReceived(self, data):
     self.transport.write(data)

   def send(self, data):
     self.transport.write(data)

   def pauseProducing(self):
     print 'Workload paused'
     self.task.pause()

   def resumeProducing(self):
     print 'Workload resumed'
     self.task.resume()

   def stopProducing(self):
     print 'Workload stopped'
     self.task.stop()

   def connectionLost(self, reason):
     print 'Connection lost'
     try:
       self.task.stop()
     except:
       pass

# Push factory
class PushFactory(Factory):
  def buildProtocol(self, addr):
    return PushProtocol()

# Run the reactor that serves everything
endpoint = TCP4ServerEndpoint(reactor, 8089)
endpoint.listen(PushFactory())
reactor.run()

2) 手动跟踪协议实例并使用来自不同线程的 reactor.callFromThread()。让您摆脱另一个线程中的长时间阻塞操作(例如 ui 事件循环)。

代码:

from twisted.internet.protocol import Factory
from twisted.internet.endpoints import TCP4ServerEndpoint
from twisted.internet.interfaces import IPushProducer
from twisted.internet.task import deferLater, cooperate
from twisted.internet.protocol import Protocol
from twisted.internet import reactor, threads
import time
import random
import threading

# Connection
protocol = None

# Some other thread that does whatever it likes.
class SomeThread(threading.Thread):
  def run(self):
    while True:
      print("Thread loop")
      time.sleep(random.randint(0, 4))
      if protocol is not None:
        reactor.callFromThread(self.dispatch)
  def dispatch(self):
    global protocol
    protocol.send("Hello World\n")

# Push protocol
class PushProtocol(Protocol):

   def connectionMade(self):
     global protocol
     protocol = self

   def dataReceived(self, data):
     self.transport.write(data)

   def send(self, data):
     self.transport.write(data)

   def connectionLost(self, reason):
     print 'Connection lost'

# Push factory
class PushFactory(Factory):
  def buildProtocol(self, addr):
    return PushProtocol()

# Start thread
other = SomeThread()
other.start()

# Run the reactor that serves everything
endpoint = TCP4ServerEndpoint(reactor, 8089)
endpoint.listen(PushFactory())
reactor.run()

就个人而言,我发现 IPushProducer 和 IPullProducer 需要定期回调,这使得它们不太有用。其他人不同意……耸耸肩。任你选。

于 2013-06-26T16:17:50.137 回答