3

I have a python program that currently uses a tcp/ip client module I wrote to receive data from a streaming server. The server outputs lines of data.

My TCP client class is fairly primitive and I want to refactor to use a twisted ReconnectingClientFactory.

The main program currently gets data from a readLines function in my TCP Client that 'yields' the lines as they are received.

The TCP client method is accessed by:

for msg_buffer in self.myTcpClient.readLines():
    do some stuff with the data in msg_buffer

In my TCP Client the readLines method in essence looks like:

while True:
    newLine = self.sock.recv(self.buffer_size)
    yield newLine

When I implement the twisted client I'll want some way for it to behave as an iterator and yield data. I assume I'd do something in the protocol dataReceived method.

I'm lost trying to figure out how this works. It appears to me that twisted deferred are meant for this sort of use but I can't figure out how to use a deferred for my purpose (if my assumption about deferred is correct).

In a perfect world the twisted client would yield the lines as received so a call similar to the present method would do the job. i.e.

class GetData(protocol):
    def dataReceived(self, data):
        yield data

But I think that's an oversimplification.

In summary, what I'm trying to do is implement a twisted reconnecting TCP client that behaves something like my readLines method and can be accessed more or less like:

for msg_buffer in self.twistedTcpClient.readLines():

Any pointers will be much appreciated

UPDATE: I just stumbled across 'Crochet' for twisted. At first glance Crochet appears to have been designed for exactly the kind of model that I need... I'll report back after some testing

4

1 回答 1

1

Twisted 的做法是编写一个协议。而不是这样做:

for line in self.twistedTcpClient.readLines():
    process_line(line) ...

你会写你的协议(也许通过子类化 a twisted.protocols.basic.LineReceiver):

class MyProtocol(LineReceiver):
    ...
    def lineReceived(self, line):
        process_line(line) ...

您想重构代码以使用lineReceived回调而不是迭代循环。

你写的:

for line in self.twistedTcpClient.readLines():
    process_line(line) ...

是有问题的,因为 Twisted 是异步的。在等待方法时,Twisted 没有办法做任何其他事情twistedTcpClient.readLines()

我建议编写一个协议,但如果您真的坚持使用这种迭代器模式,那么您也许可以这样做:

@inlineCallbacks
def my_func():
    while True:
        try:
            line = yield self.twistedTcpClient.getNextLine()
        except StopIteration:
            break

        process_line(line) ...

现在,棘手的事情是每次调用 getNextLine() 时都twistedTcpClient返回Deferreds。也许是这样的:

class MyProtocol(LineReceiver):
    ...
    def getNextLine(self):
        self.defer_given_out = Deferred()

    def lineReceived(self, line):
        self.defer_given_out.callback(line)

    def connectionLost(self):
        self.defer_given_out.errback(StopIteration())

(这只是一个说明这个想法的例子,你必须扩展它来处理细节。)

于 2013-11-15T18:14:07.137 回答