1

所以我运行下面的代码,当我运行它后使用 queue.qsize() 时,队列中仍然有 450,000 左右的项目,这意味着文本文件的大部分行没有被读取。知道这里发生了什么吗?

from Queue import Queue
from threading import Thread

lines = 660918 #int(str.split(os.popen('wc -l HGDP_FinalReport_Forward.txt').read())[0]) -1
queue = Queue()
File = 'HGDP_FinalReport_Forward.txt'
num_threads =10
short_file = open(File)

class worker(Thread):
    def __init__(self,queue):
        Thread.__init__(self)
        self.queue = queue
    def run(self):
        while True:           
            try:           
                self.queue.get()     
                i  = short_file.readline()
                self.queue.task_done() #signal to the queue that the task is done
            except:               
                break

## This is where I should make the call to the threads

def main():
    for i in range(num_threads):
        worker(queue).start()
    queue.join()


    for i in range(lines): # put the range of the number of lines in the .txt file
        queue.put(i)

main()
4

2 回答 2

1

很难确切地知道您要在这里做什么,但是如果可以独立处理每一行,multiprocessing这是一个更简单的选择,它将为您处理所有同步。额外的好处是您不必提前知道行数。

基本上,

import multiprocessing
pool = multiprocessing.Pool(10)

def process(line):
    return len(line) #or whatever

with open(path) as lines:
    results = pool.map(process, lines)

或者,如果您只是想从行中获得某种聚合结果,您可以使用它reduce来降低内存使用量。

import operator
with open(path) as lines:
    result = reduce(operator.add, pool.map(process, lines))
于 2012-06-06T18:04:41.643 回答
1

所以我尝试这样做,但我有点困惑,因为我每次都需要通过一行,而这不是代码似乎在做的事情

import multiprocessing as mp

File = 'HGDP_FinalReport_Forward.txt'
#short_file = open(File)
test = []

def pro(temp_line):
    temp_line = temp_line.strip().split()
    return len(temp_line)

if __name__ == "__main__":
    with open("HGDP_FinalReport_Forward.txt") as lines:
        pool = mp.Pool(processes = 10)
        t = pool.map(pro,lines)
于 2012-06-15T06:25:33.453 回答