4

我正在使用单进程版本 python 进行频率字数统计:

#coding=utf-8
import string
import time
from collections import Counter
starttime = time.clock()
origin = open("document.txt", 'r').read().lower()
for_split = [',','\n','\t','\'','.','\"','!','?','-', '~']

#the words below will be ignoered when counting
ignored = ['the', 'and', 'i', 'to', 'of', 'a', 'in', 'was', 'that', 'had',
       'he', 'you', 'his','my', 'it', 'as', 'with', 'her', 'for', 'on']
i=0
for ch in for_split:
    origin = string.replace(origin, ch, ' ')
words = string.split(origin)
result = Counter(words).most_common(40)
for word, frequency in result:
    if not word in ignored and i < 10:
        print "%s : %d" % (word, frequency)
        i = i+1
print time.clock() - starttime

那么多处理版本看起来像:

#coding=utf-8
import time
import multiprocessing
from collections import Counter
for_split = [',','\n','\t','\'','.','\"','!','?','-', '~']
ignored = ['the', 'and', 'i', 'to', 'of', 'a', 'in', 'was', 'that', 'had',
       'he', 'you', 'his','my', 'it', 'as', 'with', 'her', 'for', 'on']
result_list = []

def worker(substr):
    result = Counter(substr)
    return result

def log_result(result):
    result_list.append(result)

def main():
    pool = multiprocessing.Pool(processes=5)
    origin = open("document.txt", 'r').read().lower()
 for ch in for_split:
         origin = origin.replace(ch, ' ')
    words = origin.split()
    step = len(words)/4
        substrs = [words[pos : pos+step] for pos in range(0, len(words), step)]
    result = Counter()
    for substr in substrs:
        pool.apply_async(worker, args=(substr,), callback = log_result)
    pool.close()
    pool.join()
    result = Counter()
    for item in result_list:
        result = result + item
    result = result.most_common(40)
    i=0
    for word, frequency in result:
        if not word in ignored and i < 10:
            print "%s : %d" % (word, frequency)
            i = i+1

if __name__ == "__main__":
        starttime = time.clock()
        main()
        print time.clock() - starttime

“document.txt”大约22M,我的笔记本电脑有核心,2G内存,第一个版本的结果是3.27s,第二个是8.15s,我改变了进程数(pool = multiprocessing.Pool( processes=5) ),从 2 到 10,结果几乎一样,这是为什么,我怎样才能让这个程序比单进程版本运行得更快?

4

1 回答 1

5

我认为这是与将单个字符串分发给工作人员并接收结果相关的开销。如果我使用示例文档(Dostojevski 的“犯罪与惩罚”)运行上面给出的并行代码,则运行大约需要 0.32 秒,而单进程版本只需 0.09 秒。如果我将worker函数修改为只处理字符串“test”而不是真实文档(仍然将真实字符串作为参数传递),运行时间会下降到 0.22 秒。但是,如果我将“test”作为参数传递给map_async函数,则运行时间会减少到 0.06 秒。因此,我会说,在您的情况下,程序的运行时间受到进程间通信开销的限制。

使用以下代码,我将并行版本的运行时间降低到 0.08 秒:首先,我将文件划分为多个(几乎)长度相等的块,确保各个块之间的边界与换行符一致。然后,我只需将块的长度和偏移量传递给每个工作进程,让它打开文件,读取块,处理它并返回结果。与通过 map_async 函数直接分发字符串相比,这似乎造成的开销要少得多。对于较大的文件大小,您应该能够使用此代码看到运行时的改进。此外,如果您可以容忍小的计数错误,您可以省略确定正确的块边界的步骤,而只是将文件拆分成同样大的块。在我的示例中,这会将运行时间降低到 0.04 秒,

#coding=utf-8
import time
import multiprocessing
import string
from collections import Counter
import os
for_split = [',','\n','\t','\'','.','\"','!','?','-', '~']
ignored = ['the', 'and', 'i', 'to', 'of', 'a', 'in', 'was', 'that', 'had',
       'he', 'you', 'his','my', 'it', 'as', 'with', 'her', 'for', 'on']
result_list = []

def worker(offset,length,filename):
    origin = open(filename, 'r')
    origin.seek(offset)
    content = origin.read(length).lower()

    for ch in for_split:
         content = content.replace(ch, ' ')

    words = string.split(content)
    result = Counter(words)
    origin.close()
    return result

def log_result(result):
    result_list.append(result)

def main():
    processes = 5
    pool = multiprocessing.Pool(processes=processes)
    filename = "document.txt"
    file_size = os.stat(filename)[6]
    chunks = []
    origin = open(filename, 'r')
    while True:
        lines = origin.readlines(file_size/processes)
        if not lines:
            break
        chunks.append("\n".join(lines))

    lengths = [len(chunk) for chunk in chunks]
    offset = 0

    for length in lengths:
        pool.apply_async(worker, args=(offset,length,filename,), callback = log_result)
        offset += length

    pool.close()
    pool.join()
    result = Counter()
    for item in result_list:
        result = result + item
    result = result.most_common(40)
    i=0
    for word, frequency in result:
        if not word in ignored and i < 10:
            print "%s : %d" % (word, frequency)
            i = i+1
if __name__ == "__main__":
    starttime = time.clock()
    main()
    print time.clock() - starttime
于 2013-08-18T15:55:08.433 回答