1

我正在开发一个 python 脚本,它监视一个目录(使用 libinotify)的新文件,并对每个新文件进行一些处理,然后将其复制到存储服务器。我们使用的是 NFS 挂载,但存在一些性能问题,现在我们正在使用 FTP 进行测试。看起来 FTP 使用的资源比 nfs 少得多(负载始终低于 2,而 nfs 则高于 5)。

我们现在遇到的问题是在 TIME_WAIT 状态下保持打开的连接数量。存储在时间等待中的峰值约为 15k 连接。

我想知道是否有某种方法可以重新使用以前的连接进行新的传输。

任何人都知道是否有某种方法可以做到这一点?

谢谢

4

2 回答 2

1

这是一个新的答案,基于对前一个的评论。

我们将使用单个 TCP 套接字,并通过交替发送名称和内容来发送每个文件,如netstrings,每个文件都在一个大流中。

我假设 Python 2.6,双方的文件系统使用相同的编码,并且您不需要大量并发客户端(但您可能偶尔需要两个客户端,例如,真正的客户端和测试仪) . 而且我再次假设您有一个模块filegenerator,其generate()方法注册到inotify,将通知排队,并yield一一发送。

客户端.py:

import contextlib
import socket
import filegenerator

sock = socket.socket()
with contextlib.closing(sock):
    sock.connect((HOST, 12345))
    for filename in filegenerator.generate():
        with open(filename, 'rb') as f:
            contents = f.read()
            buf = '{0}:{1},{2}:{3},'.format(len(filename), filename, 
                                            len(contents), contents)
            sock.sendall(buf)

服务器.py:

import contextlib
import socket
import threading

def pairs(iterable):
    return zip(*[iter(iterable)]*2)

def netstrings(conn):
    buf = ''
    while True:
        newbuf = conn.recv(1536*1024) 
        if not newbuf:
            return
        buf += newbuf
        while True:
            colon = buf.find(':')
            if colon == -1:
                break
            length = int(buf[:colon])
            if len(buf) >= colon + length + 2:
                if buf[colon+length+1] != ',':
                    raise ValueError('Not a netstring') 
                yield buf[colon+1:colon+length+1]
                buf = buf[colon+length+2:]

def client(conn):
    with contextlib.closing(conn):
        for filename, contents in pairs(netstrings(conn)):
            with open(filename, 'wb') as f:
                f.write(contents)

sock = socket.socket()
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
with contextlib.closing(sock):
    sock.bind(('0.0.0.0', 12345))
    sock.listen(1)
    while True:
        conn, addr = sock.accept()
        t = threading.Thread(target=client, args=[conn])
        t.daemon = True
        t.start()

如果您在 Windows 上需要超过 200 个客户端,在 linux 和 BSD(包括 Mac)上需要 100 个以上,在不太好的平台上需要十几个,您可能希望epoll在 Linuxkqueue上使用 BSD上的事件循环设计而不是线程设计, 和 Windows 上的 IO 完成端口。这很痛苦,但幸运的是,有一些框架可以为您解决所有问题。Twistedgevent是两个流行的(并且非常不同的)选择。

一件特别好的事情gevent是,您现在可以编写线程代码,并通过一些简单的更改将其变成基于事件的代码,就像魔术一样。

另一方面,如果你最终想要基于事件的代码,最好从一开始就学习和使用框架,这样你就不必处理所有繁琐的accepting 和循环recv直到您会收到完整的消息并干净地关闭等等,然后只写您关心的部分。毕竟,上面一半以上的代码基本上是每个服务器共享的东西的样板,所以如果你不必编写它,为什么还要麻烦呢?


在评论中,你说:

这些文件也是二进制文件,因此如果客户端编码与服务器编码不同,我可能会遇到问题。

'rb'请注意,我以二进制模式 ( and )打开每个文件'wb',并有意选择了一种协议 (netstrings),该协议可以处理二进制字符串,而不会尝试将它们解释为字符或将嵌入的 NUL 字符视为 EOF 或类似的东西。而且,当我str.format在 Python 2.x 中使用 , 时,它不会进行任何隐式编码,除非你给它unicode提供字符串或给它基于区域设置的格式类型,我都没有这样做。(请注意,在 3.x 中,您需要使用bytes而不是str,这会更改一些代码。)

换句话说,客户端和服务器编码不会进入它;您正在执行与 FTP 的 I 模式完全相同的二进制传输。


但是,如果您想要相反的情况,为目标系统自动传输文本和重新编码怎么办?有三种简单的方法可以做到这一点:

  1. 发送客户端的编码(在顶部一次,或每个文件一次),然后在服务器上,从客户端解码并重新编码到本地文件。
  2. 以文本/Unicode 模式执行所有操作,甚至是套接字。这很愚蠢,在 2.x 中也很难做到。
  3. 定义一种有线编码,比如 UTF-8。客户端负责将文件解码并编码为UTF-8进行发送;服务器负责在接收和编码文件上解码 UTF-8。

使用第三个选项,假设文件将采用您的默认文件系统编码,更改后的客户端代码为:

with io.open(filename, 'r', encoding=sys.getfilesystemencoding()) as f:
    contents = f.read().encode('utf-8')

在服务器上:

with io.open(filename, 'w', encoding=sys.getfilesystemencoding()) as f:
    f.write(contents.decode('utf-8'))

默认情况下,该io.open函数还使用通用换行符,因此客户端会将任何内容转换为 Unix 样式的换行符,而服务器将转换为其自己的本机换行符类型。

请注意,FTP 的 T 模式实际上进行任何重新编码;它只进行换行转换(以及更有限的版本)。

于 2013-07-05T06:45:29.657 回答
0

是的,您可以使用ftplib. 您所要做的就是不要关闭它们并继续使用它们。

例如,假设您有一个模块filegenerator,其generate()方法注册到inotify,将通知排队,并yield一一发送:

import ftplib
import os
import filegenerator

ftp = ftplib.FTP('ftp.example.com')
ftp.login()
ftp.cwd('/path/to/store/stuff')

os.chdir('/path/to/read/from/')

for filename in filegenerator.generate():
    with open(filename, 'rb') as f:
        ftp.storbinary('STOR {}'.format(filename), f)

ftp.close()

我对此有点困惑:

我们现在遇到的问题是在 TIME_WAIT 状态下保持打开的连接数量。

听起来您的问题不是为每个文件创建一个新连接,而是您从不关闭旧连接。在这种情况下,解决方案很简单:只需关闭它们。


要么,要么你试图并行地做它们,但不要意识到这就是你正在做的事情。

如果您想要一些并行性,但不是无限制的,您可以轻松地创建一个由 4 个线程组成的池,每个线程都有一个打开的ftplib连接,每个线程都从一个队列中读取,然后是一个inotify刚刚推送到该队列的线程。

于 2013-05-06T19:02:45.087 回答