5

我需要阅读一些非常大的文本文件(100+ Mb),用正则表达式处理每一行并将数据存储到一个结构中。我的结构继承自 defaultdict,它有一个读取 self.file_name 文件的 read(self) 方法。

看看这个非常简单(但不是真实的)示例,我没有使用正则表达式,但我正在拆分行:


import multiprocessing
from collections import defaultdict

def SingleContainer():
    return list()

class Container(defaultdict):
    """
    this class store odd line in self["odd"] and even line in self["even"].
    It is stupid, but it's only an example. In the real case the class
    has additional methods that do computation on readen data.
    """
    def __init__(self,file_name):
        if type(file_name) != str:
            raise AttributeError, "%s is not a string" % file_name
        defaultdict.__init__(self,SingleContainer)
        self.file_name = file_name
        self.readen_lines = 0
    def read(self):
        f = open(self.file_name)
        print "start reading file %s" % self.file_name
        for line in f:
            self.readen_lines += 1
            values = line.split()
            key = {0: "even", 1: "odd"}[self.readen_lines %2]
            self[key].append(values)
        print "readen %d lines from file %s" % (self.readen_lines, self.file_name)

def do(file_name):
    container = Container(file_name)
    container.read()
    return container.items()

if __name__ == "__main__":
    file_names = ["r1_200909.log", "r1_200910.log"]
    pool = multiprocessing.Pool(len(file_names))
    result = pool.map(do,file_names)
    pool.close()
    pool.join()
    print "Finish"      

最后,我需要将每个结果加入一个容器中。保持行的顺序很重要。返回值时我的方法太慢了。更好的解决方案?我在 Linux 上使用 python 2.6

4

3 回答 3

5

你可能遇到两个问题。

提到了其中之一:您正在一次读取多个文件。这些读取最终会被交错,导致磁盘抖动。您想一次读取整个文件,然后只对数据进行多线程计算。

其次,您遇到了 Python 的多处理模块的开销。它实际上不是使用线程,而是启动多个进程并通过管道序列化结果。这对于批量数据来说非常慢——事实上,它似乎比你在线程中所做的工作要慢(至少在示例中)。这是由 GIL 引起的现实问题。

如果我修改 do() 以返回 None 而不是 container.items() 以禁用额外的数据复制,只要文件已被缓存,此示例比单个线程更快

两个线程:0.36elapsed 168%CPU

一个线程(将 pool.map 替换为 map):0:00.52elapsed 98%CPU

不幸的是,GIL 问题是根本性的,无法从 Python 内部解决。

于 2010-01-15T04:36:36.590 回答
0

You're creating a pool with as many workers as files. That may be too many. Usually, I aim to have the number of workers around the same as the number of cores.

The simple fact is that your final step is going to be a single process merging all the results together. There is no avoiding this, given your problem description. This is known as a barrier synchronization: all tasks have to reach the same point before any can proceed.

You should probably run this program multiple times, or in a loop, passing a different value to multiprocessing.Pool() each time, starting at 1 and going to the number of cores. Time each run, and see which worker count does best.

The result will depend on how CPU-intensive (as opposed to disk-intensive) your task is. I would not be surprised if 2 were best if your task is about half CPU and half disk, even on an 8-core machine.

于 2010-01-15T02:40:58.540 回答
0

多处理更适合面向 CPU 或内存的进程,因为在文件之间切换时,旋转驱动器的寻道时间会影响性能。要么将日志文件加载到快速闪存驱动器或某种内存磁盘(物理或虚拟)中,要么放弃多处理。

于 2010-01-15T02:29:59.793 回答