-1

我正在尝试编写一个简单的 TCP 服务器,它必须按顺序执行以下操作:

  1. 客户端连接到服务器,并且KEEPALIVE此连接的标志设置为 1。
  2. 服务器从客户端接收数据。
  3. 然后它计算作为列表的响应。
  4. 然后服务器将列表中的每一项逐一发送,同时等待来自客户端的显式 ACK,即,在发送列表中的单个项目之后,服务器等待来自客户端的 ACK 数据包,并且只有在收到 ACK 之后它是否继续以相同的方式发送其余项目。

以下是代码:

class MyFactory(ServerFactory):
    protocol = MyProtocol

    def __init__(self, service):
        self.service = service

class MyProtocol(Protocol):

    def connectionMade(self):
         try:
             self.transport.setTcpKeepAlive(1)
         except AttributeError: 
             pass
         self.deferred = Deferred()
         self.deferred.addCallback(self.factory.service.compute_response)
         self.deferred.addCallback(self.send_response)

    def dataReceived(self, data):
         self.fire(data)

    def fire(self, data):
        if self.deferred is not None:
            d, self.deferred = self.deferred, None
            d.callback(data)

    def send_response(self, data):
        for item in data:
            d = Deferred()
            d.addCallback(self.transport.write)
            d.addCallback(self.wait_for_ack)
            d.callback(item)
        return   

    def wait_for_ack(self, dummy):
        try:
            self.transport.socket.recv(1024)
        except socket.error as e:
            print e
        return

在运行服务器和客户端时,我得到以下异常:

Resource temporarily unavailable

我了解此异常的原因 - 我正在尝试在非阻塞套接字上调用阻塞方法。

请帮助我找到解决此问题的方法。

4

1 回答 1

5

您的示例存在一些问题:

  1. 您没有compute_response在任何地方定义(除其他外),因此我无法运行您的示例。考虑将其设为 http://sscce.org
  2. 你永远不应该调用 Twisted 传输下的任何一个sendrecv一个套接字;让 Twisted 为您调用这些方法。在这种情况下,recv它将提供recvto的结果dataReceived
  3. 您不能依赖dataReceived接收完整的消息;数据包在传输过程中可能总是被任意分段,因此您需要有一个帧协议来封装您的消息。

但是,由于我的另一个答案非常拙劣,我欠你一个更彻底的解释,说明如何设置你想要做的事情。

正如您的问题所规定的,您的协议没有完全定义到足以给出答案;您不能使用原始 TCP 片段进行请求和响应,因为您的应用程序无法知道它们的开始和结束位置(参见上面的第 3 点)。所以,我为这个例子发明了一个小协议:它是一个行分隔的协议,客户端发送"request foo\n",服务器立即发送"thinking...\n",计算响应,然后发送"response foo\n"并等待客户端发送"ok";作为响应,服务器将发送下一"response ..."行,或者"done\n"表示它已完成发送响应的行。

作为我们的协议,我相信您问题的关键要素是您不能在 Twisted 中“等待确认”,或者就此而言,其他任何事情。您需要做的是按照“收到确认时......”来实现一些东西。

因此,当收到消息时,我们需要识别消息的类型:确认还是请求?

  1. 如果是请求,我们需要计算响应;当响应完成计算后,我们需要将响应的所有元素排入队列并发送第一个。
  2. 如果是确认,我们需要检查响应的传出队列,如果它有任何内容,则发送它的第一个元素;否则,发送“完成”。

这是一个完整的、可运行的示例,它实现了我以这种方式描述的协议:

from twisted.internet.protocol import ServerFactory
from twisted.internet.task import deferLater
from twisted.internet import reactor
from twisted.internet.interfaces import ITCPTransport
from twisted.protocols.basic import LineReceiver

class MyProtocol(LineReceiver):
    delimiter = "\n"
    def connectionMade(self):
        if ITCPTransport.providedBy(self.transport):
            self.transport.setTcpKeepAlive(1)
        self.pendingResponses = []

    def lineReceived(self, line):
        split = line.rstrip("\r").split(None, 1)
        command = split[0]
        if command == b"request":
            # requesting a computed response
            payload = split[1]
            self.sendLine("thinking...")
            (self.factory.service.computeResponse(payload)
             .addCallback(self.sendResponses))
        elif command == b"ok":
            # acknowledging a response; send the next response
            if self.pendingResponses:
                self.sendOneResponse()
            else:
                self.sendLine(b"done")

    def sendOneResponse(self):
        self.sendLine(b"response " + self.pendingResponses.pop(0))

    def sendResponses(self, listOfResponses):
        self.pendingResponses.extend(listOfResponses)
        self.sendOneResponse()

class MyFactory(ServerFactory):
    protocol = MyProtocol

    def __init__(self, service):
        self.service = service

class MyService(object):
    def computeResponse(self, request):
        return deferLater(
            reactor, 1.0,
            lambda: [request + b" 1", request + b" 2", request + b" 3"]
        )

from twisted.internet.endpoints import StandardIOEndpoint
endpoint = StandardIOEndpoint(reactor)

endpoint.listen(MyFactory(MyService()))
reactor.run()

我已经让它在标准 I/O 上运行,这样你就可以运行它并输入它来感受它是如何工作的;如果您想在实际的网络端口上运行它,只需将其替换为不同类型的端点即可。希望这能回答你的问题。

于 2015-11-20T09:46:44.120 回答