1

我正在尝试同时使用 Python3 解析两个大文件。如此处所示:

dict = {}
row = {}
with open(file1, "r") as f1, open(file2, "r") as f2:
  zipped = zip(f1, f2)
  for line_f1, line_f2 in zipped:
    # parse the lines and save the line information in a dictionary 
    row = {"ID_1":line_f1[0], "ID_2":line_f2[0], ...}

    # This process takes roughly 0.0005s each time
    # it parses each pair of lines at once and returns an output
    # it doesn't depend on previous lines or lines after
    output = process(row) 

    # output is a string, add it to dict
    if output in dict:
       dict[output] += 1
    else:
       dict[output] = 1
return dict

当我用两个较小的文本文件(每个 30,000 行,文件大小 = 13M)测试上述代码时,完成循环大约需要 150 秒。

当我使用两个大文本文件(每个文件 9,000,000 行,文件大小 = 3.8G)进行测试时,没有循环中的处理步骤,大约需要 670 秒。

当我在流程步骤中使用相同的两个大文本文件进行测试时。我计算出每 10,000 件物品大约需要 60 秒。当迭代次数变大时,时间并没有增长。

但是,当我将此作业提交到共享集群时,一对大文件需要超过 36 个小时才能完成处理。我试图弄清楚是否有任何其他方式来处理文件,以便它可以更快。任何建议,将不胜感激。

提前致谢!

4

1 回答 1

0

这只是一个假设,但您的进程可能会在每次触发 I/O 以获取一对线路时浪费其分配的 CPU 插槽。您可以尝试一次读取多组行并分块处理,这样您就可以充分利用在共享集群上获得的每个 CPU 时间段。

from collections import deque
chunkSize = 1000000 # number of characters in each chunk (you will need to adjust this)
chunk1    = deque([""]) #buffered lines from 1st file
chunk2    = deque([""]) #buffered lines from 2nd file
with open(file1, "r") as f1, open(file2, "r") as f2:
    while chunk1 and chunk2:
        line_f1 = chunk1.popleft()
        if not chunk1:
            line_f1,*more = (line_f1+file1.read(chunkSize)).split("\n")
            chunk1.extend(more)
        line_f2 = chunk2.popleft()
        if not chunk2:
            line_f2,*more = (line_f2+file2.read(chunkSize)).split("\n")
            chunk2.extend(more)
        # process line_f1, line_f2
        ....

它的工作方式是读取一大块字符(必须大于最长的行)并将其分解为行。这些行被放置在队列中以进行处理。

因为块大小以字符数表示,所以队列中的最后一行可能不完整。

为了确保行在被处理之前是完整的,当我们到达队列中的最后一行时,会读取另一个块。附加字符被添加到不完整行的末尾,并对组合字符串执行行拆分。因为我们连接了最后(不完整的)行,所以该.split("\n")函数总是适用于从行边界开始的一段文本。

该过程继续(现已完成)最后一行,其余行被添加到队列中。

于 2020-03-06T20:44:38.473 回答