3

我被要求编写一个连接到服务器的类,异步向服务器发送各种命令,然后将返回的数据提供给客户端。我被要求在 Python 中执行此操作,这对我来说是一门新语言。我开始四处挖掘,发现了 Twisted 框架,它提供了一些非常好的抽象(Protocol、ProtocolFactory、Reactor),如果我要推出自己的基于套接字的应用程序,我必须做很多事情。考虑到我必须解决的问题,这似乎是正确的选择。

我浏览了网络上的许多示例(主要是Krondo),但我仍然没有看到一个很好的示例来创建将通过线路发送多个命令并维护我创建的连接的客户端。在这种情况下,服务器(我无法控制)在发送响应后不会断开连接。那么,设计客户端的正确方法是什么,以便我可以以各种方式对服务器进行痒痒呢?

现在我这样做:

class TestProtocol(Protocol)
    def connectionMade(self):
         self.transport.write(self.factory.message)

class TestProtocolFactory(Factory):
    message = ''
    def setMessage(self, msg):
        self.message = msg

def main():
    f = TestProtocolFactory()
    f.setMessage("my message")
    reactor.connectTCP(...)
    reactor.run()

我真正想做的是self.transport.write(...)通过反应器调用(实际上,从另一个执行线程按需调用 TestProtocolFactory::setMessage() ),而不仅仅是在建立连接时。

4

3 回答 3

4

You may want to use a Service.

Services are pieces of functionality within a Twisted app which are started and stopped, and are nice abstractions for other parts of your code to interact with. For example, in this case you might have a SayStuffToServerService (I know, terrible name, but without knowing more about its job it was the best I could do here :) ) that exposed something like this:

class SayStuffToServerService:
    def __init__(self, host, port):
        # this is the host and port to connect to

    def sendToServer(self, whatToSend):
        # send some line to the remote server

    def startService(self):
        # call me before using the service. starts outgoing connection efforts.

    def stopService(self):
        # clean reactor shutdowns should call this method. stops outgoing
        # connection efforts.

(That might be all the interface you need, but it should be fairly clear where you can add things to this.)

The startService() and stopService() methods here are just what Twisted's Services expose. And helpfully, there is a premade Twisted Service which acts like a TCP client and takes care of all the reactor stuff for you. It's twisted.application.internet.TCPClient, which takes arguments for a remote host and port, along with a ProtocolFactory to take care of handling the actual connection attempt.

Here is the SayStuffToServerService, implemented as a subclass of TCPClient:

from twisted.application import internet

class SayStuffToServerService(internet.TCPClient):
    factoryclass = SayStuffToServerProtocolFactory

    def __init__(self, host, port):
        self.factory = self.factoryclass()
        internet.TCPClient.__init__(self, host, port, self.factory)

    def sendToServer(self, whatToSend):
        # we'll do stuff here

(See below for the SayStuffToServerProtocolFactory.)

Using this Service architecture is convenient in a lot of ways; you can group Services together in one container, so that they all get stopped and started as one when you have different parts of your app that you want active. It may make good sense to implement other parts of your app as separate Services. You can set Services as child services to application- the magic name that twistd looks for in order to know how to initialize, daemonize, and shut down your app. Actually yes, let's add some code to do that now.

from twisted.application import service

...

application = service.Application('say-stuff')

sttss = SayStuffToServerService('localhost', 65432)
sttss.setServiceParent(service.IServiceCollection(application))

That's all. Now when you run this module under twistd (i.e., for debugging, twistd -noy saystuff.py), that application will be started under the right reactor, and it will in turn start the SayStuffToServerService, which will start a connection effort to localhost:65432, which will use the service's factory attribute to set up the connection and the Protocol. You don't need to call reactor.run() or attach things to the reactor yourself anymore.

So we haven't implemented SayStuffToServerProtocolFactory yet. Since it sounds like you would prefer that your client reconnect if it has lost the connection (so that callers of sendToServer can usually just assume that there's a working connection), I'm going to put this protocol factory on top of ReconnectingClientFactory.

from twisted.internet import protocol

class SayStuffToServerProtocolFactory(protocol.ReconnectingClientFactory):
    _my_live_proto = None
    protocol = SayStuffToServerProtocol

This is a pretty nice minimal definition, which will keep trying to make outgoing TCP connections to the host and port we specified, and instantiate a SayStuffToServerProtocol each time. When we fail to connect, this class will do nice, well-behaved exponential backoff so that your network doesn't get hammered (you can set a maximum wait time). It will be the responsibility of the Protocol to assign to _my_live_proto and call this factory's resetDelay() method, so that exponential backoff will continue to work as expected. And here is that Protocol now:

class SayStuffToServerProtocol(basic.LineReceiver):
    def connectionMade(self):
        # if there are things you need to do on connecting to ensure the
        # connection is "all right" (maybe authenticate?) then do that
        # before calling:
        self.factory.resetDelay()
        self.factory._my_live_proto = self

    def connectionLost(self, reason):
        self.factory._my_live_proto = None
        del self.factory

    def sayStuff(self, stuff):
        self.sendLine(stuff)

    def lineReceived(self, line):
        # do whatever you want to do with incoming lines. often it makes sense
        # to have a queue of Deferreds on a protocol instance like this, and
        # each incoming response gets sent to the next queued Deferred (which
        # may have been pushed on the queue after sending some outgoing
        # message in sayStuff(), or whatever).
        pass

This is implemented on top of twisted.protocols.basic.LineReceiver, but would work as well with any other sort of Protocol, in case your protocol isn't line-oriented.

The only thing left is hooking up the Service to the right Protocol instance. This is why the Factory keeps a _my_live_proto attribute, which should be set when a connection is successfully made, and cleared (set to None) when that connection is lost. Here's the new implementation of SayStuffToServerService.sendToServer:

class NotConnectedError(Exception):
    pass

class SayStuffToServerService(internet.TCPClient):

    ...

    def sendToServer(self, whatToSend):
        if self.factory._my_live_proto is None:
            # define here whatever behavior is appropriate when there is no
            # current connection (in case the client can't connect or
            # reconnect)
            raise NotConnectedError
        self.factory._my_live_proto.sayStuff(whatToSend)

And now to tie it all together in one place:

from twisted.application import internet, service
from twisted.internet import protocol
from twisted.protocols import basic

class SayStuffToServerProtocol(basic.LineReceiver):
    def connectionMade(self):
        # if there are things you need to do on connecting to ensure the
        # connection is "all right" (maybe authenticate?) then do that
        # before calling:
        self.factory.resetDelay()
        self.factory._my_live_proto = self

    def connectionLost(self, reason):
        self.factory._my_live_proto = None
        del self.factory

    def sayStuff(self, stuff):
        self.sendLine(stuff)

    def lineReceived(self, line):
        # do whatever you want to do with incoming lines. often it makes sense
        # to have a queue of Deferreds on a protocol instance like this, and
        # each incoming response gets sent to the next queued Deferred (which
        # may have been pushed on the queue after sending some outgoing
        # message in sayStuff(), or whatever).
        pass

class SayStuffToServerProtocolFactory(protocol.ReconnectingClientFactory):
    _my_live_proto = None
    protocol = SayStuffToServerProtocol

class NotConnectedError(Exception):
    pass

class SayStuffToServerService(internet.TCPClient):
    factoryclass = SayStuffToServerProtocolFactory

    def __init__(self, host, port):
        self.factory = self.factoryclass()
        internet.TCPClient.__init__(self, host, port, self.factory)

    def sendToServer(self, whatToSend):
        if self.factory._my_live_proto is None:
            # define here whatever behavior is appropriate when there is no
            # current connection (in case the client can't connect or
            # reconnect)
            raise NotConnectedError
        self.factory._my_live_proto.sayStuff(whatToSend)

application = service.Application('say-stuff')

sttss = SayStuffToServerService('localhost', 65432)
sttss.setServiceParent(service.IServiceCollection(application))

Hopefully that gives enough of a framework with which to start. There is sometimes a lot of plumbing to do to handle client disconnections just the way you want, or to handle out-of-order responses from the server, or handle various sorts of timeout, canceling pending requests, allowing multiple pooled connections, etc, etc, but this should help.

于 2012-05-14T20:45:58.100 回答
4

要看。以下是一些可能性:

我假设

方法 1. 您有一个发送服务器的命令列表,但由于某种原因不能一次全部完成。在这种情况下,在前一个答案返回时发送一个新的:

class proto(parentProtocol):
    def stringReceived(self, data):
        self.handle_server_response(data)
        next_command = self.command_queue.pop()
        # do stuff

方法 2. 您发送给服务器的内容取决于服务器发送给您的内容:

class proto(parentProtocol):
    def stringReceived(self, data):
        if data == "this":
            self.sendString("that")
        elif data == "foo":
            self.sendString("bar")
        # and so on

方法 3. 你不关心服务器发送什么,你只想定期发送一些命令:

class proto(parentProtocol):
    def callback(self):
        next_command = self.command_queue.pop()
        # do stuff
    def connectionMade(self):
        from twisted.internet import task
        self.task_id = task.LoopingCall(self.callback)
        self.task_id.start(1.0)

方法 4:您的编辑现在提到从另一个线程触发。随意检查扭曲的文档,看看是否proto.sendString是线程安全的。您可能可以直接调用它,但我不知道。方法 3 线程安全的。只需从另一个线程填充队列(这是线程安全的)。

基本上你可以在你的协议中存储任意数量的状态;它会一直存在,直到你完成。您可以向服务器发送命令作为对它给您的消息的响应,或者您设置一些调度来完成您的工作。或两者。

于 2012-05-09T19:34:22.007 回答
0

twisted 框架是基于事件的编程;本质上,它的方法都是异步调用的,结果是通过defer对象获取的。

该框架的本质是适合协议开发的,只是你必须改变你对传统顺序编程的想法。Protocol 类就像一个有限状态机,具有以下事件:连接建立、连接丢失、接收数据。您可以将您的客户端代码转换为 FSM,然后很容易适应协议类。

下面是我想要表达的一个粗略的例子。有点胭脂,但这是我现在可以提供的:

class SyncTransport(Protocol):
    # protocol
    def dataReceived(self, data):
        print 'receive data', data
    def connectionMade(self):
        print 'i made a sync connection, wow'
        self.transport.write('x')
        self.state = I_AM_LIVING
    def connectionLost(self):
        print 'i lost my sync connection, sight'
    def send(self, data):
        if self.state == I_AM_LIVING:
            if data == 'x':
              self.transport.write('y')
           if data == 'Y':
              self.transport.write('z')
              self.state = WAITING_DEAD
        if self.state == WAITING_DEAD:
              self.transport.close()
于 2013-04-03T06:09:35.027 回答