1

我正在连接一个服务器,该服务器将向我发送需要逐行处理的流数据。所以我必须解析出单独的行,然后处理每一行。下面的代码似乎工作得很好,但我想知道是否有任何标准的设计模式来做这种事情。或者这是要走的路?

队列是否会引入任何严重的开销?我需要它尽可能快速和高效,这也是我偏离了像 twisted 这样的库的原因。

import socket, multiprocessing

def receive_proc(s, q):
    data = ''
    while True:
        data += s.recv(4096)
        if '\n' in data:
            lines = data.split('\n')[:-1]
            for line in lines:
                if len(line) > 0:
                    q.put(line)
                    data = data.replace(line+'\n', '', 1)

q = multiprocessing.Queue()
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(('127.0.0.1', 1234))

p = multiprocessing.Process(target=receive_proc, args=(s,q))
p.start()

while True:
    line = q.get()

    # do your processing here
4

1 回答 1

4

想要远离诸如twisted之类的东西当然有正当的理由,但我认为效率不在其中——我怀疑它们更有可能以正确的方式进行优化。性能是一头棘手的野兽,瓶颈通常不在您想象的地方,这就是为什么您需要在进行适当优化之前进行概要分析的原因。例如,框架可能已经努力将更多代码推送到 C 扩展中,这肯定会提高性能。如果性能是您的关键动机,第三方的东西可能是更安全的选择。此外,使用其他人针对各种不同的用例和环境测试和调整的代码存在很大的争议——如果你最终重新发明了太多的轮子,总是有可能缺少一些辐条的风险。

但是,您需要做的事情似乎很简单,因此安装和学习框架以及向代码添加另一个运行时依赖项的开销可能不合理。此外,如果您主要受 IO 限制,那么燃烧一些额外的 CPU 来进行处理不会有太大的不同。在过去,我当然避免了诸如扭曲之类的事情,仅仅是因为我知道自己编写它会更快(就我的时间而言)并且性能会“足够好”。我一直发现twisted 的回调系统使调试有点棘手——例如,获取错误消息可能有点令人担忧。这绝不是不可能的,很多人都非常成功地使用它,但我个人觉得它太“繁琐”,不适合简单的任务。

我认为在这种情况下,您将接收和处理拆分为自己的进程的想法可能是一种错误的经济——从套接字接收数据非常快,并且如果您在纯 Python 中进行大量处理,则可能占主导地位性能因素。但是,如果不知道你在做什么处理,我不能肯定地说。如果这将是耗时和/或 CPU 密集型的,并且您可以独立于前几行处理每一行,那么这可能是合理的,但您可能希望将处理转移到一整套工作进程。根据您现有的代码,这非常容易 - 只需将主进程设置为接收器而不是“从属”,并创建一个共享一个Queue. 每个工人都经过一个循环来选择下一个项目并产生结果。无论每个需要多长时间,他们都会在下一个可用时获得下一个项目(并将Queue为您处理)。

但是,如果您的处理循环也主要受 IO 限制(例如写入文件),那么您可能会发现单个进程实际上比将所有内容推入管道的开销要好。这取决于许多因素,包括您的 CPU 架构(某些系统使 CPU 内核之间的传输比其他系统更昂贵),但最终您不希望使用多个进程,除非您非常有信心它会给您带来性能优势。

无论如何,如果循环IO 绑定的,您可能会发现使用非阻塞 IO 的单个进程是要走的路。您可以使用 Python 的select模块自己完成此操作,或者您可能会使用eventletgevent 之类的库发现它更简洁。

不相关的 - 你从缓冲区中剥离 start 的方法效率很低 - 你不需要使用replace()你可以使用你现有的split(),如下所示:

while True:
    data += s.recv(4096)
    if '\n' in data:
        lines = data.split('\n')
        for line in lines[:-1]:
            if len(line) > 0:
                q.put(line)
        data = lines[-1]
于 2013-01-24T14:29:30.983 回答