4

我这样编写客户端-服务器应用程序:client(c#) <-> server (twisted; ftp proxy and additional functional) <-> ftp server

服务器有两个类:我自己的类协议继承自 LineReceiever 协议和 FTPClient 来自 twisted.protocols.ftp。

但是当客户端发送或获取大文件(10 Gb - 20 Gb)时,服务器会捕获 MemoryError。我的代码中没有使用任何缓冲区。当调用 transport.write(data) 数据附加到反应器编写器的内部缓冲区时会发生这种情况(如果我错了,请纠正我)。

我应该用什么来避免这个问题?还是我应该改变解决问题的方法?

我发现对于大流,我应该使用 IConsumer 和 IProducer 接口。但最后它会调用 transfer.write 方法,效果是一样的。还是我错了?

升级版:

这是文件下载/上传的逻辑(从 ftp 通过 Twisted 服务器到 Windows 上的客户端):

客户端向 Twisted 服务器发送一些标头,然后开始发送文件。扭曲的服务器接收标头,然后(如果需要)调用setRawMode(),打开 ftp 连接并从/向客户端接收/发送字节,并在所有关闭连接之后。这是上传文件的部分代码:

FTPManager 类

def _ftpCWDSuccees(self, protocol, fileName):
        self._ftpClientAsync.retrieveFile(fileName, FileReceiver(protocol))



class FileReceiver(Protocol):
    def __init__(self, proto):
        self.__proto = proto

    def dataReceived(self, data):
        self.__proto.transport.write(data)

    def connectionLost(self, why = connectionDone):
        self.__proto.connectionLost(why)

主要代理服务器类:

class SSDMProtocol(LineReceiver)
...

在 SSDMProtocol 对象(调用obSSDMProtocol)解析标头之后,它调用打开 ftp 连接(FTPClientfrom twisted.protocols.ftp)并设置 FTPManager 字段 _ftpClientAsync 的对象_ftpCWDSuccees(self, protocol, fileName)protocol = obSSDMProtocol在文件的字节收到时调用dataReceived(self, data)FileReceiver 对象的调用。

并且当self.__proto.transport.write(data)被调用时,数据附加到内部缓冲区比发送回客户端更快,因此内存耗尽。当缓冲区达到一定大小时我可以停止读取并在缓冲区全部发送到客户端后恢复读取?或类似的东西?

4

1 回答 1

14

如果您将 20 GB(千兆位?)字符​​串传递给transport.write,您将需要至少 20 GB(千兆位?)的内存 - 可能更像是 40 或 60,因为在 Python 中处理字符串时需要额外的复制.

即使您从未将单个字符串传递到transport.write20 GB(千兆位?),如果您transport.write以比您的网络处理速度更快的速度反复调用短字符串,发送缓冲区最终会变得太大而无法放入内存,而您会遇到一个MemoryError

这两个问题的解决方案是生产者/消费者系统。使用IProducerandIConsumer给你的好处是你永远不会有一个 20 GB(千兆位?)的字符串,你永远不会用太多较短的字符串填满发送缓冲区。网络将受到限制,因此字节的读取速度不会超过您的应用程序处理它们并忘记它们的速度。您的字符串最终将达到 16kB - 64kB 的大小,应该很容易放入内存中。

您只需要调整您的使用FileReceiver以包括将传入连接注册为传出连接的生产者:

class FileReceiver(Protocol):
    def __init__(self, outgoing):
        self._outgoing = outgoing

    def connectionMade(self):
        self._outgoing.transport.registerProducer(self.transport, streaming=True)

    def dataReceived(self, data):
        self._outgoing.transport.write(data)

现在只要self._outgoing.transport发送缓冲区填满,它就会告诉self.transport暂停。一旦发送缓冲区清空,它将告诉self.transport恢复。 self.transport现在了解如何在 TCP 级别执行这些操作,以便进入服务器的数据也将减慢。

于 2012-10-15T13:55:49.213 回答