3

我有两个中等大小的 ascii 文件,其中包含固定格式的数据。我需要测试第一个文件的一行中的 6 个给定字段是否匹配(在给定的容差范围内)第二个文件的任何行上的六个字段,然后输出一个公共行以继续处理。

我目前正在使用 fortran 样式的行阅读器拆分文件中的每一行,并为每个列表中的每个元素生成一个具有正确类型的列表列表。我将两个文件中的列表列表存储在内存中,同时对它们进行操作

我需要比较的字段都是浮点数,我目前正在使用以下类型的流:

tol = 0.01
for entry1 in file1List:
    for entry2 in file2List:
        if (abs(entry1[1] - entry2[1]) < tol and abs(entry1[2] - entry2[2]) < tol 
            and abs(entry1[3] - entry2[3]) < tol and abs(entry1[4] - entry2[4]) < tol
            and abs(entry1[5] - entry2[5]) < tol and abs(entry1[6] - entry2[6]) < tol):
            print entry1,entry2

在只包含少量行的文件上执行此操作很好,但超过 30000 行仅此部分的执行时间超过 1 分钟!

我相当肯定必须有一个更快的比较方法,但我很难找到它,任何帮助将不胜感激。

4

4 回答 4

2

如果您可以将元素存储在numpy 数组中file1listfile2list那么您的比较就会变得更加简单(而且可能更快):

for entry1 in file1list:
    for entry2 in file2list:
        if(np.all(np.abs(entry1[1:7] - entry2[1:7]) > tol)):
            print entry1,entry2

转换为 numpy 数组很轻松:

file1list = [ np.array(entry) for entry in file1list ]

对于 sorted 来说,这会更好一些file2list

file2list=sorted(file2list,key=operator.itemgetter(1,2,3,4,5,6))
for entry1 in file1list:
    for entry2 in file2list:
        d=np.abs(entry1[1:7]-entry2[1:7])
        if(np.all(d < tol)):
            print entry1,entry2
        elif(d[0] > tol):
            break 
于 2012-07-23T18:16:57.370 回答
1

您当前的处理是O(n m)n文件 1 中的行m数和文件 2 中的行数在哪里。当 时n ~ m ~ 30,000,它会进行 900,000,000 次比较:毫不奇怪,需要一分钟!

由于允许您偏离公差,这使情况变得更加复杂:如果它是完全匹配的,您可以从其中一个文件中创建一个值的字典,然后有O(m)时间构建字典以及O(n)进行查找的时间,因为基于散列的字典具有恒定时间查找。

一种有点相似的可能性是制作一个四舍五入值的字典,以便其中的内容由相同的值作为键,然后在找到可能的匹配项时tol确保所有内容都在其中。tol您可以通过将每个条目四舍五入到略大于tol: 也就是说,如果tol1e-3,则 key 基于四舍五入到的条目1e-2。这有多有效取决于您的值与 的分布tol,但它应该非常好。

也就是说,做这样的事情(未经测试,但它应该可以工作):

 import math
 import operator

 cmp_fields = operator.itemgetter(1, 2, 3, 4, 5, 6)

 # higher-order function to get a key-getting function for a given tolerance
 def key_getter(tol):
     prec = -math.log10(tol) - 1
     def inner(entry):
         return tuple(math.round(x, prec) for x in cmp_fields(entry))
     return inner

 # make the key function we want here
 key_fn = key_getter(tol)

 # build the dictionary of entries from file2: key maps to a list of entries
 file_2_entries = collections.defaultdict(list)
 for entry in file2List:
     file_2_entries[key_fn(entry)].append(entry)

 for entry1 in file1List: # for each entry from file1...
     # check only the potential matches based on rounding from 2
     for entry2 in file_2_entries[key_fn(entry2)]:
         if all(abs(x1 - x2) < tol for x1, x2
                in zip(map(cmp_fields, (entry1, entry2)))):
             print entry1, entry2
于 2012-07-23T18:03:29.507 回答
1

您当前方法的问题是您将第一个文件中的每一行与第二个文件中的每一行进行比较。为了减少运行时间,您需要一种方法来短路第二个for循环,并使用一些逻辑,例如“如果此迭代匹配某个条件,那么我可以跳出这个循环,因为任何后续条目都不可能匹配”。

要支持此逻辑,您首先需要对两个列表进行排序,例如:

from operator import itemgetter
comp_fields = itemgetter(1, 2, 3, 4, 5, 6)
file1List.sort(key=comp_fields)
file2List.sort(key=comp_fields)
for entry1 in file1List:
    for entry2 in file2List:
        if (abs(entry1[1] - entry2[1]) < tol and abs(entry1[2] - entry2[2]) < tol 
            and abs(entry1[3] - entry2[3]) < tol and abs(entry1[4] - entry2[4]) < tol
            and abs(entry1[5] - entry2[5]) < tol and abs(entry1[6] - entry2[6]) < tol):
            print entry1,entry2
        elif entry2[1] - entry1[1] >= tol:
            # we know subsequent lines in file2List can't match due to ordering
            break

这是一个非常幼稚的解决方案,例如,一个可能的改进是跟踪起点,例如,如果在上一次迭代中file1List我们在找到匹配项之前进入了 2000 行file2List,那么在下一次迭代中file1List我们可以开始迭代file2List在第 2000 行。它也只在第一列上进行短路,并且您可以添加一些额外的逻辑以使其在后续列上也短路(由于存在容差,而不是要求完全匹配)。

于 2012-07-23T18:09:29.027 回答
0

您可以根据值对 file1List 和 file2List 进行排序

entry1[1]^2 + entry1[2]^2 + ... + entry1[6]^2

这是六空间中的两个向量度量列表。给定 file1List 中的向量,您可以使用 Newton-Raphson 方法(或二分法:Python [已经支持][1] this.

扫描 file2List 中与 file1List 的下一个值匹配的下一个值,您只需向前扫描,file2List 中的第一个值超过 tol 将意味着对 file1List 的特定值的搜索结束。

我相信原始 JOIN 实用程序的算法或多或少相似。

于 2012-07-23T18:37:01.547 回答