2

所以我只是想多处理并阅读文本文档中的每一行。有 660918 行,我知道它们的长度都是一样的。尽管使用以下代码,行的长度似乎发生了变化,但我不知道为什么。

import multiprocessing

class Worker(multiprocessing.Process):
    def __init__(self,in_q):
        multiprocessing.Process.__init__(self)
        self.in_q = in_q
    def run(self):      
        while True:
            try:
                in_q.get()
                temp_line = short_file.readline()
                temp_line = temp_line.strip().split()
                print len(temp_line)
                self.in_q.task_done()
            except:                              
                break     

if __name__ == "__main__":
    num_proc = 10
    lines = 100000 #660918 is how many lines there actually are
    in_q = multiprocessing.JoinableQueue()
    File = 'HGDP_FinalReport_Forward.txt'
    short_file = open(File)

    for i in range(lines):
        in_q.put(i)    

    for i in range(num_proc):
        worker = Worker(in_q)
        worker.start()
    in_q.join() 
4

2 回答 2

7

您正在主进程中打开一个文件,然后在子进程中从该文件中读取。你不能那样做。

在幕后,文件对象实际上是一个原始文件句柄和一个内存缓冲区。每个进程共享文件句柄,但每个进程都有自己的内存缓冲区。

假设所有的行都是 50 字节,内存缓冲区是 4096 字节。

进程 1 调用 readline,它将文件中的字节 0-4095 读取到其缓冲区中,然后在该缓冲区中扫描 50 个字节的换行符,并返回前 50 个字节。到目前为止,一切都很好。

进程 2 调用 readline,它将文件中的字节 4096-8191 读取到其缓冲区中,然后扫描该缓冲区以查找换行符。第一个在 4100 处,即 5 个字节,因此它返回前 5 个字节。

等等。

理论上你可以通过无缓冲 I/O 来解决这个问题,但实际上,为什么呢?为什么不直接阅读主进程中的行?除了避免这个问题之外,这也可能会提高并行性——I/O 本质上是顺序的,所以所有这些进程都会将大部分时间花在 I/O 上,这意味着它们对你没有任何好处。

附带说明一下,在运行循环的顶部附近,您正在执行 in_q.get() 而不是 self.in_q.get()。(这恰好起作用,因为 in_q 是一个永远不会消失的全局变量,而 self.in_q 只是它的一个副本,但你不想依赖它。)

于 2012-06-15T00:28:11.033 回答
1

因此,我将其更改为使用 Pool,并且似乎可以正常工作。以下更好吗?

import multiprocessing as mp

File = 'HGDP_FinalReport_Forward.txt'
#short_file = open(File)
test = []

def pro(temp_line):
    temp_line = temp_line.strip().split()
    return len(temp_line)

if __name__ == "__main__":
    with open("HGDP_FinalReport_Forward.txt") as lines:
        pool = mp.Pool(processes = 10)
        t = pool.map(pro,lines.readlines())
    print t
于 2012-06-15T16:31:18.323 回答