2

我有一个包含 ~300.000 个元组的集合

In [26]: sa = set(o.node for o in vrts_l2_5) 
In [27]: len(sa)
Out[27]: 289798
In [31]: random.sample(sa, 1)
Out[31]: [('835644', '4696507')]

现在我想根据一个公共子字符串查找元素,例如前 4 个“数字”(实际上这些元素是字符串)。这是我的方法:

def lookup_set(x_appr, y_appr):
    return [n for n in sa if n[0].startswith(x_appr) and n[1].startswith(y_appr)]

In [36]: lookup_set('6652','46529')
Out[36]: [('665274', '4652941'), ('665266', '4652956')]

有没有更有效,即更快的方法来做到这一点?

4

7 回答 7

2

如果您有能力保留两个已排序的元组副本,您可以O(log(n) + m)及时完成,其中n是元组m的数量,是匹配元组的数量。排序本身会花费,即它会比您的幼稚方法渐进地,但是如果您必须执行一定数量的查询(超过,这几乎肯定是非常小的),它会得到回报。O(nlog(n))log(n)

这个想法是,您可以使用二分法找到具有正确第一个值和正确第二个值的候选,然后与这些集合相交。

但是请注意,您需要一种奇怪的比较:您关心以给定参数开头的所有字符串。这仅仅意味着在搜索最右边的匹配项时,您应该用9s 填充键。

一个完整的工作(虽然没有经过太多测试)代码:

from random import randint
from operator import itemgetter

first = itemgetter(0)
second = itemgetter(1)

sa = [(str(randint(0, 1000000)), str(randint(0, 1000000))) for _ in range(300000)]
f_sorted = sorted(sa, key=first)
s_sorted = sa
s_sorted.sort(key=second)
max_length = max(len(s) for _,s in sa)

# See: bisect module from stdlib
def bisect_right(seq, element, key):
    lo = 0
    hi = len(seq)
    element = element.ljust(max_length, '9')
    while lo < hi:
        mid = (lo+hi)//2
        if element < key(seq[mid]):
            hi = mid
        else:
            lo = mid + 1
    return lo


def bisect_left(seq, element, key):
    lo = 0
    hi = len(seq)
    while lo < hi:
        mid = (lo+hi)//2
        if key(seq[mid]) < element:
            lo = mid + 1
        else:
            hi = mid
    return lo


def lookup_set(x_appr, y_appr):
    x_left = bisect_left(f_sorted, x_appr, key=first)
    x_right = bisect_right(f_sorted, x_appr, key=first)
    x_candidates = f_sorted[x_left:x_right + 1]
    y_left = bisect_left(s_sorted, y_appr, key=second)
    y_right = bisect_right(s_sorted, y_appr, key=second)
    y_candidates = s_sorted[y_left:y_right + 1]
    return set(x_candidates).intersection(y_candidates)

并与您的初始解决方案进行比较:

In [2]: def lookup_set2(x_appr, y_appr):
   ...:     return [n for n in sa if n[0].startswith(x_appr) and n[1].startswith(y_appr)]

In [3]: lookup_set('123', '124')
Out[3]: set([])

In [4]: lookup_set2('123', '124')
Out[4]: []

In [5]: lookup_set('123', '125')
Out[5]: set([])

In [6]: lookup_set2('123', '125')
Out[6]: []

In [7]: lookup_set('12', '125')
Out[7]: set([('12478', '125908'), ('124625', '125184'), ('125494', '125940')])

In [8]: lookup_set2('12', '125')
Out[8]: [('124625', '125184'), ('12478', '125908'), ('125494', '125940')]

In [9]: %timeit lookup_set('12', '125')
1000 loops, best of 3: 589 us per loop

In [10]: %timeit lookup_set2('12', '125')
10 loops, best of 3: 145 ms per loop

In [11]: %timeit lookup_set('123', '125')
10000 loops, best of 3: 102 us per loop

In [12]: %timeit lookup_set2('123', '125')
10 loops, best of 3: 144 ms per loop

如您所见,此解决方案240-1400(在这些示例中)比您的幼稚方法快几倍。

如果您有大量匹配项:

In [19]: %timeit lookup_set('1', '2')
10 loops, best of 3: 27.1 ms per loop

In [20]: %timeit lookup_set2('1', '2')
10 loops, best of 3: 152 ms per loop

In [21]: len(lookup_set('1', '2'))
Out[21]: 3587
In [23]: %timeit lookup_set('', '2')
10 loops, best of 3: 182 ms per loop

In [24]: %timeit lookup_set2('', '2')
1 loops, best of 3: 212 ms per loop

In [25]: len(lookup_set2('', '2'))
Out[25]: 33053

如您所见,即使匹配数约为总大小的 10%,此解决方案也更快。但是,如果您尝试匹配所有数据:

In [26]: %timeit lookup_set('', '')
1 loops, best of 3: 360 ms per loop

In [27]: %timeit lookup_set2('', '')
1 loops, best of 3: 221 ms per loop

它变得(不是那么多)慢,虽然这是一个非常特殊的情况,我怀疑你会经常匹配几乎所有的元素。

请注意,处理sort数据的时间非常短:

In [13]: from random import randint
    ...: from operator import itemgetter
    ...: 
    ...: first = itemgetter(0)
    ...: second = itemgetter(1)
    ...: 
    ...: sa2 = [(str(randint(0, 1000000)), str(randint(0, 1000000))) for _ in range(300000)]

In [14]: %%timeit
    ...: f_sorted = sorted(sa2, key=first)
    ...: s_sorted = sorted(sa2, key=second)
    ...: max_length = max(len(s) for _,s in sa2)
    ...: 
1 loops, best of 3: 881 ms per loop

如您所见,完成两个排序副本只需不到一秒钟的时间。实际上,上面的代码会稍微快一些,因为它对第二个副本“就地”排序(尽管 tim-sort 仍然需要O(n)内存)。

这意味着如果您必须执行超过 6-8 个查询,此解决方案将更快。


注意:python 的标准库提供了一个bisect模块。但是它不允许使用key参数(尽管我记得读过 Guido 想要它,所以将来可能会添加它)。因此,如果您想直接使用它,则必须使用“decorate-sort-undecorate”习语。

代替:

f_sorted = sorted(sa, key=first)

你应该做:

f_sorted = sorted((first, (first,second)) for first,second in sa)

即,您明确地将键插入元组的第一个元素。之后,您可以使用('123', '')作为元素传递给bisect_*函数,它应该找到正确的索引。

我决定避免这种情况。我从模块的源代码中复制粘贴了代码,并对其稍作修改,以便为您的用例提供更简单的界面。


最后一句话:如果您可以将元组元素转换为整数,那么比较会更快。但是,大部分时间仍然会用于执行集合的交集,所以我不知道它究竟会提高多少性能。

于 2013-10-03T12:28:08.747 回答
1

我经历并实施了 4 个建议的解决方案来比较它们的效率。我用不同的前缀长度运行测试,看看输入如何影响性能。特里树和排序列表的性能肯定对输入的长度很敏感,随着输入的变长,两者都会变得更快(我认为它实际上对输出的大小很敏感,因为随着前缀的变长,输出会变小)。但是,排序集解决方案在所有情况下肯定更快。

sa在这些计时测试中,每种方法有 200000 个元组和 10 次运行:

for prefix length 1
  lookup_set_startswith    : min=0.072107 avg=0.073878 max=0.077299
  lookup_set_int           : min=0.030447 avg=0.037739 max=0.045255
  lookup_set_trie          : min=0.111548 avg=0.124679 max=0.147859
  lookup_set_sorted        : min=0.012086 avg=0.013643 max=0.016096
for prefix length 2
  lookup_set_startswith    : min=0.066498 avg=0.069850 max=0.081271
  lookup_set_int           : min=0.027356 avg=0.034562 max=0.039137
  lookup_set_trie          : min=0.006949 avg=0.010091 max=0.032491
  lookup_set_sorted        : min=0.000915 avg=0.000944 max=0.001004
for prefix length 3
  lookup_set_startswith    : min=0.065708 avg=0.068467 max=0.079485
  lookup_set_int           : min=0.023907 avg=0.033344 max=0.043196
  lookup_set_trie          : min=0.000774 avg=0.000854 max=0.000929
  lookup_set_sorted        : min=0.000149 avg=0.000155 max=0.000163
for prefix length 4
  lookup_set_startswith    : min=0.065742 avg=0.068987 max=0.077351
  lookup_set_int           : min=0.026766 avg=0.034558 max=0.052269
  lookup_set_trie          : min=0.000147 avg=0.000167 max=0.000189
  lookup_set_sorted        : min=0.000065 avg=0.000068 max=0.000070

这是代码:

import random
def random_digits(num_digits):
    return random.randint(10**(num_digits-1), (10**num_digits)-1)

sa = [(str(random_digits(6)),str(random_digits(7))) for _ in range(200000)]

### naive approach
def lookup_set_startswith(x_appr, y_appr):
    return [item for item in sa if item[0].startswith(x_appr) and item[1].startswith(y_appr) ]

### trie approach
from marisa_trie import RecordTrie

# make length of string in packed format big enough!
fmt = ">10p10p"
sa_tries = (RecordTrie(fmt, zip([unicode(first) for first, second in sa], sa)),
         RecordTrie(fmt, zip([unicode(second) for first, second in sa], sa)))

def lookup_set_trie(x_appr, y_appr):
 # lookup prefix in the appropriate trie and intersect the result
 return set(item[1] for item in sa_tries[0].items(unicode(x_appr))) & \
        set(item[1] for item in sa_tries[1].items(unicode(y_appr)))

### int approach
sa_ints = [(int(first), int(second)) for first, second in sa]

sa_lens = tuple(map(len, sa[0]))

def lookup_set_int(x_appr, y_appr):
    x_limit = 10**(sa_lens[0]-len(x_appr))
    y_limit = 10**(sa_lens[1]-len(y_appr))

    x_int = int(x_appr) * x_limit
    y_int = int(y_appr) * y_limit

    return [sa[i] for i, int_item in enumerate(sa_ints) \
        if (x_int <= int_item[0] and int_item[0] < x_int+x_limit) and \
           (y_int <= int_item[1] and int_item[1] < y_int+y_limit) ]

### sorted set approach
from operator import itemgetter

first = itemgetter(0)
second = itemgetter(1)

sa_sorted = (sorted(sa, key=first), sorted(sa, key=second))
max_length = max(len(s) for _,s in sa)

# See: bisect module from stdlib
def bisect_right(seq, element, key):
    lo = 0
    hi = len(seq)
    element = element.ljust(max_length, '9')
    while lo < hi:
        mid = (lo+hi)//2
        if element < key(seq[mid]):
            hi = mid
        else:
            lo = mid + 1
    return lo


def bisect_left(seq, element, key):
    lo = 0
    hi = len(seq)
    while lo < hi:
        mid = (lo+hi)//2
        if key(seq[mid]) < element:
            lo = mid + 1
        else:
            hi = mid
    return lo


def lookup_set_sorted(x_appr, y_appr):
    x_left = bisect_left(sa_sorted[0], x_appr, key=first)
    x_right = bisect_right(sa_sorted[0], x_appr, key=first)
    x_candidates = sa_sorted[0][x_left:x_right]
    y_left = bisect_left(sa_sorted[1], y_appr, key=second)
    y_right = bisect_right(sa_sorted[1], y_appr, key=second)
    y_candidates = sa_sorted[1][y_left:y_right]
    return set(x_candidates).intersection(y_candidates)     


####
# test correctness
ntests = 10

candidates = [lambda x, y: set(lookup_set_startswith(x,y)), 
              lambda x, y: set(lookup_set_int(x,y)),
              lookup_set_trie, 
              lookup_set_sorted]
print "checking correctness (or at least consistency)..."
for dlen in range(1,5):
    print "prefix length %d:" % dlen,
    for i in range(ntests):
        print " #%d" % i,
        prefix = map(str, (random_digits(dlen), random_digits(dlen)))
        answers = [c(*prefix) for c in candidates]
        for i, ans in enumerate(answers):
            for j, ans2 in enumerate(answers[i+1:]):
                assert ans == ans2, "answers for %s for #%d and #%d don't match" \
                                    % (prefix, i, j+i+1)
    print


####
# time calls
import timeit
import numpy as np

ntests = 10

candidates = [lookup_set_startswith,
              lookup_set_int,
              lookup_set_trie, 
              lookup_set_sorted]

print "timing..."
for dlen in range(1,5):
    print "for prefix length", dlen

    times = [ [] for c in candidates ]
    for _ in range(ntests):
        prefix = map(str, (random_digits(dlen), random_digits(dlen)))

        for c, c_times in zip(candidates, times):
            tstart = timeit.default_timer()
            trash = c(*prefix)
            c_times.append(timeit.default_timer()-tstart)
    for c, c_times in zip(candidates, times):
        print "  %-25s: min=%f avg=%f max=%f" % (c.func_name, min(c_times), np.mean(c_times), max(c_times))
于 2013-10-03T18:19:19.857 回答
1

整数操作比字符串快得多。(并且内存也更小)

所以如果你可以比较整数,你会快得多。我怀疑这样的事情应该适合你:

sa = set(int(o.node) for o in vrts_l2_5) 

那么这可能对你有用:

def lookup_set(samples, x_appr, x_len, y_appr, y_len):
    """

    x_appr == SSS0000  where S is the digit to search for
    x_len == number of digits to S (if SSS0000 then x_len == 4)
    """
    return ((x, y) for x, y in samples if round(x, -x_len) ==  x_appr and round(y, -y_len) == y_approx)

此外,它返回一个生成器,因此您不会一次将所有结果加载到内存中。

更新为使用 Bakuriu 提到的圆形方法

于 2013-10-03T09:31:03.413 回答
1

您可以使用trie 数据结构。可以使用 dict 对象树构建一个(请参阅如何在 Python 中创建 TRIE),但是有一个包marisa-trie通过绑定到 c++ 库来实现内存高效版本

我以前没有使用过这个库,但是玩弄它,我得到了这个工作:

from random import randint
from marisa_trie import RecordTrie

sa = [(str(randint(1000000,9999999)),str(randint(1000000,9999999))) for i in range(100000)]
# make length of string in packed format big enough!
fmt = ">10p10p"
sa_tries = (RecordTrie(fmt, zip((unicode(first) for first, _ in sa), sa)),
            RecordTrie(fmt, zip((unicode(second) for _, second in sa), sa)))

def lookup_set(sa_tries, x_appr, y_appr):
    """lookup prefix in the appropriate trie and intersect the result"""
     return (set(item[1] for item in sa_tries[0].items(unicode(x_appr))) & 
             set(item[1] for item in sa_tries[1].items(unicode(y_appr))))

lookup_set(sa_tries, "2", "4")
于 2013-10-03T09:40:11.567 回答
0

可能有,但不是非常多。str.startswith并且and都是快捷操作符(一旦发现失败就可以返回),索引元组是一种快速操作。这里花费的大部分时间将来自对象查找,例如为每个字符串查找startswith方法。可能最有价值的选择是通过 Pypy 运行它。

于 2013-10-03T08:34:30.493 回答
0

更快的解决方案是创建一个字典 dict 并将第一个值作为键,第二个作为值。

  1. 然后,您将在 dict 的有序键列表中搜索与 x_appr 匹配的键(有序列表将允许您使用二分法优化键列表中的搜索)。这将提供一个名为 k_list 的密钥列表。

  2. 然后查找具有 k_list 中的键并匹配 y_appr 的 dict 值。

您还可以在附加到 k_list 之前包含第二步(匹配 y_appr 的值)。这样 k_list 将包含 dict 正确元素的所有键。

于 2013-10-03T08:40:06.790 回答
0

这里我只是比较了'in'方法和'find'方法:

CSV 输入文件包含 URL 列表

# -*- coding: utf-8 -*-

### test perfo str in set

import re
import sys
import time
import json
import csv
import timeit

cache = set()

#######################################################################

def checkinCache(c):
  global cache
  for s in cache:
    if c in s:
      return True
  return False

#######################################################################

def checkfindCache(c):
  global cache
  for s in cache:
    if s.find(c) != -1:
      return True
  return False

#######################################################################

print "1/3-loading pages..."
with open("liste_all_meta.csv.clean", "rb") as f:
    reader = csv.reader(f, delimiter=",")
    for i,line in enumerate(reader):
      cache.add(re.sub("'","",line[2].strip()))

print "  "+str(len(cache))+" PAGES IN CACHE"

print "2/3-test IN..."
tstart = timeit.default_timer()
for i in range(0, 1000):
  checkinCache("string to find"+str(i))
print timeit.default_timer()-tstart

print "3/3-test FIND..."
tstart = timeit.default_timer()
for i in range(0, 1000):
  checkfindCache("string to find"+str(i))
print timeit.default_timer()-tstart

print "\n\nBYE\n"

结果以秒为单位:

1/3-loading pages...
  482897 PAGES IN CACHE
2/3-test IN...
107.765980005
3/3-test FIND...
167.788629055


BYE

所以,'in' 方法比 'find' 方法快:)

玩得开心

于 2018-08-18T17:30:54.437 回答