2

我对 Python 和 Twisted 都很陌生,所以我可能只是没有正确理解事情,但我似乎陷入了需要帮助的地步。

我想要做的是在 SSL 连接上使用ReconnectingClientFactory 。我已经全部运行了,但是如果连接被断开,所有发送到传输的 write() 方法的数据都会被简单地丢弃而没有任何错误。实际调用的方法是twisted.protocols.tls.TLSMemoryBIOProtocol.write()

这是我认为正在发生的事情(从工作连接开始):

  • 连接丢失
  • 使用一些数据调用 write() 方法(此处为源代码)
  • self.disconnecting 是False数据传递给_write() 方法
  • _write 方法到达 _lostTLSConnection ,True然后运行return
  • 连接被重新获得,但没有数据发送,因为它没有在任何地方缓冲

这是客户端的简化版本:

from OpenSSL import SSL
from twisted.internet.protocol import (Protocol, ReconnectingClientFactory)
from twisted.internet import (reactor, ssl)
import struct

class MetricsServer(Protocol):
    streambuffer = bytearray()

    def connectionMade(self):
        self.transport.setTcpKeepAlive(True) # maintain the TCP connection
        self.transport.setTcpNoDelay(False) # allow Nagle algorithm
        print("connected to server")            

    def dataReceived(self, data):
        print("from server:", data)

    def connectionLost(self, reason):
        self.connected = 0
        print("server connection lost:", reason)

class MetricsServerFactory(ReconnectingClientFactory):
    protocol = MetricsServer
    maxDelay = 300 # maximum seconds between retries
    factor = 1.6180339887498948
    packet_sequence_number = 0
    active_connection = None

    def buildProtocol(self, addr):
        self.resetDelay()
        if self.active_connection == None:
            self.active_connection = self.protocol()
        return self.active_connection

    def get_packet_sequence_number(self):
        self.packet_sequence_number += 1
        return self.packet_sequence_number

    def send_data(self):
        print ("sending ssl packet")
        packet = struct.pack("!I", self.get_packet_sequence_number())
        self.active_connection.transport.write(packet)
        reactor.callLater(1.0, metrics_server.send_data)

class CtxFactory(ssl.ClientContextFactory):
    def getContext(self):
        self.method = SSL.TLSv1_METHOD
        ctx = ssl.ClientContextFactory.getContext(self)
        ctx.use_certificate_file('keys/client.crt')
        ctx.use_privatekey_file('keys/client.key')

        def verifyCallback(connection, x509, errnum, errdepth, ok):
            return bool(ok)
        ctx.set_verify(SSL.VERIFY_PEER, verifyCallback)
        ctx.load_verify_locations("keys/ca.pem")
        return ctx

if __name__ == "__main__":
    metrics_server = MetricsServerFactory()
    reactor.connectSSL('localhost', 8000, metrics_server, CtxFactory())
    reactor.callLater(3.0, metrics_server.send_data)
    reactor.run()

这是一个简单的服务器,它输出它接收到的数据:

from OpenSSL import SSL
from twisted.internet import ssl, reactor
from twisted.internet.protocol import Factory, Protocol

class Echo(Protocol):
    sent_back_data = False

    def dataReceived(self, data):
        print(' '.join("{0:02x}".format(x) for x in data))

def verifyCallback(connection, x509, errnum, errdepth, ok):
    return bool(ok)

if __name__ == '__main__':
    factory = Factory()
    factory.protocol = Echo

    myContextFactory = ssl.DefaultOpenSSLContextFactory(
        'keys/server.key', 'keys/server.crt'
        )
    ctx = myContextFactory.getContext()
    ctx.set_verify(
        SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
        verifyCallback
        )

    ctx.load_verify_locations("keys/ca.pem")
    reactor.listenSSL(8000, factory, myContextFactory)
    reactor.run()

重新创建问题的过程:

  • 首先,您需要生成自己的证书和 CA 才能正常工作
  • 先运行服务器
  • 运行客户端代码
  • 在服务器端等待一些输出然后结束程序
  • 注意客户端继续尝试发送数据
  • 重启服务器端
  • 注意服务器端会继续接收数据包,但是连接丢失时发送的数据包会被丢弃

作为一种解决方法,我尝试实现自己的缓冲区以在重新连接时发送数据,但遇到了另一个问题。我希望它在重新建立连接时发送数据,我能看到的唯一钩子是 Protocol.connectionMade()。但是,该方法在 TLS 握手实际完成之前被调用,因此它最终被_write() 中的异常处理程序捕获并放入另一个缓冲区以稍后发送。 但是,该缓冲区似乎只有在从另一端接收到数据时才被发送(这在我的情况下并不经常发生,也意味着数据可能以错误的顺序到达另一端,因为在接收数据之前可能会调用 write())。我还认为在接收到数据之前再次断开连接也会导致数据缓冲区被擦除。

编辑:为第一个问题添加了示例代码。我在工厂里有这个可能很奇怪active_connection,但我正试图让它作为一个单例工作。

4

1 回答 1

0

好的,我发现了我的解决方法的问题......我正在传递bytearray要写入传输的 a ,然后立即清除它,但没有意识到写入被推迟到我清除缓冲区之后。所以,我传递了一个副本,bytearray它现在似乎可以工作了。

每次对 write 的调用都必须通过检查它是否已连接来进行,这似乎仍然不太正确,因为它的整个想法ReconnectingClientFactory是它为您处理维护连接。if另外,我认为该语句与实际运行时可能会丢失连接write(),因此仍然可能丢失数据。

from OpenSSL import SSL
from twisted.internet.protocol import (Protocol, ReconnectingClientFactory)
from twisted.internet import (reactor, ssl)
import struct

class MetricsServer(Protocol):
    streambuffer = bytearray()

    def connectionMade(self):
        self.transport.setTcpKeepAlive(True) # maintain the TCP connection
        self.transport.setTcpNoDelay(False) # allow Nagle algorithm
        print("connected to server")
        if len(self.transport.factory.wrappedFactory.send_buffer) > 0:
            self.transport.write(bytes(self.transport.factory.wrappedFactory.send_buffer))
            self.transport.factory.wrappedFactory.send_buffer.clear()

    def dataReceived(self, data):
        print("from server:", data)

    def connectionLost(self, reason):
        self.connected = 0
        print("server connection lost:", reason)


class MetricsServerFactory(ReconnectingClientFactory):
    protocol = MetricsServer
    maxDelay = 300 # maximum seconds between retries
    factor = 1.6180339887498948
    packet_sequence_number = 0
    active_connection = None

    send_buffer = bytearray()

    def buildProtocol(self, addr):
        self.resetDelay()
        if self.active_connection == None:
            self.active_connection = self.protocol()
        return self.active_connection

    def get_packet_sequence_number(self):
        self.packet_sequence_number += 1
        return self.packet_sequence_number

    def send_data(self):
        print ("sending ssl packet")
        packet = struct.pack("!I", self.get_packet_sequence_number())
        if self.active_connection and self.active_connection.connected:
            self.active_connection.transport.write(packet)
        else:
            self.send_buffer.extend(packet)
        reactor.callLater(1.0, metrics_server.send_data)


class CtxFactory(ssl.ClientContextFactory):
    def getContext(self):
        self.method = SSL.TLSv1_METHOD
        ctx = ssl.ClientContextFactory.getContext(self)
        ctx.use_certificate_file('keys/client.crt')
        ctx.use_privatekey_file('keys/client.key')

        def verifyCallback(connection, x509, errnum, errdepth, ok):
            return bool(ok)
        ctx.set_verify(SSL.VERIFY_PEER, verifyCallback)
        ctx.load_verify_locations("keys/ca.pem")
        return ctx

if __name__ == "__main__":
    metrics_server = MetricsServerFactory()
    reactor.connectSSL('localhost', 8000, metrics_server, CtxFactory())
    reactor.callLater(3.0, metrics_server.send_data)
    reactor.run()
于 2013-01-03T16:02:19.620 回答