0

我需要读取的数据文件太大,将其加载到列表中需要很长时间。我怎样才能使用多进程呢?换句话说,我想并行化进程文件读取并加载到列表中。能否请你帮忙。

基本上,我有一个数据表,我需要将其加载到列表中,如下所示。读取文件不需要时间,但将其加载到列表 (myList) 大约需要 1 分钟。那么,是否可以并行化:

def load_file(self, fileName):    
    time_start = time.time()
    myList = []
    # mySet = set()
    lines = self.read_file(fileName)
    # time_end = time.time()
    # print fileName, ": loaded ",  round(time_end-time_start, 4)," seconds" 
    for line in lines:  
        content = line.split()   
        myList.append(content)
    time_end = time.time()
    print fileName, ": ", len(myList), " rows loaded in", round(time_end-time_start, 4)," seconds"    
    return myList

def read_file(self, fileName): 
    filePath = self.data_directory + '\\' + fileName     
    try:
        with open(filePath, 'r') as f:
            lines = f.readlines()
            f.close()
            return lines
    except ValueError:
        print filePath + ' does not exist' 

一种特别的方式可能是(假设文件有 2M 行,因此 len(lines) = 2M),首先将 1M 加载到 myList1,然后将第二个 1M 并行加载到 myList2,然后合并它们,myList = myList1+myList2。但这听起来不是最佳做法。

4

2 回答 2

1

基本上,在大文件上调用 file.readlines() 从来都不是一个好主意。我不确定这条线是做什么的

self.read_file(fileName)

但恐怕它会调用file.readlines()。

通常,您不希望列表中有数百万行的大文件。那会吃掉你的记忆。

如果要过滤/转换大文件的行,然后将结果行写入另一个文件,请使用迭代器而不是在列表中加载行。

我建议尝试按照这些思路组织您的解决方案。如果大小为几 GB,这种方法可以轻松处理文件。

def split_lines(file):
    with open(file) as f:
        for line in f:                
            yield line.split()

def process_splitted_lines(file):
    for splitted_line in split_lines(file):
        <do some other thing with splitted line>
        yield something

def write_result_lines(file):
    for something in process_splitted_lines(file):
        line = <do some other thing with something>
        <write line to resultfile>
于 2014-01-09T19:01:49.290 回答
0

我做了一些测试,这很有趣,哈哈。我认为这不是很有效:) 也许还有另一种有效的方法?

import time
import multiprocessing

## Generate sample big file (~158Mo, 2M lines)
import random
chunks = "Lorem ipsum dolor sit amet consectetur adipisicing elit sed do eiusmod tempor incididunt ut labore et dolore magna aliqua".split()
with open(r"D:\testbig.txt", "w", encoding="utf-8") as f:
    for i in range(2000000):
        for nch in range(random.randrange(5,20)):
            f.write(random.choice(chunks))
            f.write(" ")
        f.write("\n")

# Proposed direct way
fileName = "foo"
time_start = time.time()
myList = []
# mySet = set()
with open(r"D:\testbig.txt", "r", encoding="utf-8") as f:
    lines = f.readlines()
time_end = time.time()
print(fileName, ": loaded ",  round(time_end-time_start, 4)," seconds" )
for line in lines:  
    content = line.split()   
    myList.append(content)
time_end = time.time()
print(fileName, ": ", len(myList), " rows loaded in", round(time_end-time_start, 4)," seconds")
del myList

# Results:
## foo : loaded  0.9204  seconds
## foo :  2000000  rows loaded in 6.9107  seconds
## Press any key to continue . . .

# Workers method:
MAXPROCESS = 7
CHUNKLEN = 25600000

# The worker
def splitter(lines):
    myList = []
    for line in lines:
        content = line.split()
        myList.append(content)
    return myList

# The code has to be fully loaded, therefore in a function
def main():

    fileName = "foo"
    time_start = time.time()
    # Declare a pool of workers
    pool = multiprocessing.Pool(MAXPROCESS)
    results = []
    with open(r"D:\testbig.txt", "r", encoding="utf-8") as f:
        while True:
            # Read an amount of lines (about CHUNKLEN bytes)
            lines = f.readlines(CHUNKLEN)
            # End of file breaks the loop
            if len(lines) == 0: break
            # Queue data to be processed
            results.append(pool.apply_async(splitter, (lines,)))
    time_end = time.time()
    print(fileName, ": loaded ",  round(time_end-time_start, 4)," seconds" )
    # Wait for queue to be processed
    pool.close()
    pool.join()
    # Put list pieces together
    myList = []
    for result in results:
        myList += result.get()

    time_end = time.time()
    print(fileName, ": ", len(myList), " rows loaded in", round(time_end-time_start, 4)," seconds")

main()

# Results:

# MAXPROCESS = 4
# CHUNKLEN = 8192
## foo : loaded  5.0075  seconds
## foo :  2000000  rows loaded in 11.0446  seconds
## Press any key to continue . . .

# MAXPROCESS = 7
# CHUNKLEN = 25600
## foo : loaded  6.0839  seconds
## foo :  2000000  rows loaded in 9.1102  seconds
## Press any key to continue . . .

# MAXPROCESS = 7
# CHUNKLEN = 25600000
## foo : loaded  3.1199  seconds
## foo :  2000000  rows loaded in 11.7622  seconds
## Press any key to continue . . .
于 2014-01-09T18:53:53.207 回答