7

我有一个大字符串说“aaaaaaaaaaabbbbbbbbbbcccccccccccddddddddddddd”(但可能更长),我有很多小字符串。我想计算(重叠是可以的)在大字符串中找到了多少次小字符串。我只关心速度。KMP 看起来不错,但看起来 Rabin-Karp 处理多个但速度很慢。

4

5 回答 5

7

大多数字符串搜索算法的问题在于,它们至少需要时间 O(k) 才能返回 k 个匹配项,所以如果你有一个字符串,比如 100 万个“a”,以及 100 万个小字符串“a”的查询,那么将需要大约 100 万次迭代来计算所有匹配项!

另一种线性时间方法是:

  1. 构造大字符串的后缀树: O(n) 其中 n 是 len(big string)
  2. 预计算后缀树中每个节点下的后缀个数:O(n)
  3. 对于每个小字符串,在后缀树中找到以小字符串为后缀的节点: O(m) 其中 m 是 len(small string)
  4. 将该节点下方的后缀数添加到总数中。(每个后缀对应大字符串中小字符串的不同匹配)

这将花费时间 O(n+p),其中 n 是大字符串的长度,p 是所有小字符串的总长度。

示例代码

根据要求,这是使用这种方法的 Python 中的一些小(ish)示例代码:

from collections import defaultdict

class SuffixTree:
    def __init__(self):
        """Returns an empty suffix tree"""
        self.T=''
        self.E={}
        self.nodes=[-1] # 0th node is empty string

    def add(self,s):
        """Adds the input string to the suffix tree.

        This inserts all substrings into the tree.
        End the string with a unique character if you want a leaf-node for every suffix.

        Produces an edge graph keyed by (node,character) that gives (first,last,end)
        This means that the edge has characters from T[first:last+1] and goes to node end."""
        origin,first,last = 0,len(self.T),len(self.T)-1
        self.T+=s
        nc = len(self.nodes)
        self.nodes += [-1]*(2*len(s))
        T=self.T
        E=self.E
        nodes=self.nodes

        Lm1=len(T)-1
        for last_char_index in xrange(first,len(T)):
            c=T[last_char_index]
            last_parent_node = -1                    
            while 1:
                parent_node = origin
                if first>last:
                    if (origin,c) in E:
                        break             
                else:
                    key = origin,T[first]
                    edge_first, edge_last, edge_end = E[key]
                    span = last - first
                    A = edge_first+span
                    m = T[A+1]
                    if m==c:
                        break
                    E[key] = (edge_first, A, nc)
                    nodes[nc] = origin
                    E[nc,m] = (A+1,edge_last,edge_end)
                    parent_node = nc
                    nc+=1  
                E[parent_node,c] = (last_char_index, Lm1, nc)
                nc+=1  
                if last_parent_node>0:
                    nodes[last_parent_node] = parent_node
                last_parent_node = parent_node
                if origin==0:
                    first+=1
                else:
                    origin = nodes[origin]

                if first <= last:
                    edge_first,edge_last,edge_end=E[origin,T[first]]
                    span = edge_last-edge_first
                    while span <= last - first:
                        first+=span+1
                        origin = edge_end
                        if first <= last:
                            edge_first,edge_last,edge_end = E[origin,T[first]]
                            span = edge_last - edge_first

            if last_parent_node>0:
                nodes[last_parent_node] = parent_node
            last+=1
            if first <= last:
                    edge_first,edge_last,edge_end=E[origin,T[first]]
                    span = edge_last-edge_first
                    while span <= last - first:
                        first+=span+1
                        origin = edge_end
                        if first <= last:
                            edge_first,edge_last,edge_end = E[origin,T[first]]
                            span = edge_last - edge_first
        return self


    def make_choices(self):
        """Construct a sorted list for each node of the possible continuing characters"""
        choices = [list() for n in xrange(len(self.nodes))] # Contains set of choices for each node
        for (origin,c),edge in self.E.items():
            choices[origin].append(c)
        choices=[sorted(s) for s in choices] # should not have any repeats by construction
        self.choices=choices
        return choices


    def count_suffixes(self,term):
        """Recurses through the tree finding how many suffixes are based at each node.
        Strings assumed to use term as the terminating character"""
        C = self.suffix_counts = [0]*len(self.nodes)
        choices = self.make_choices()
        def f(node=0):
            t=0
            X=choices[node]
            if len(X)==0:
                t+=1 # this node is a leaf node
            else:
                for c in X:
                    if c==term:
                        t+=1
                        continue
                    first,last,end = self.E[node,c]
                    t+=f(end)
            C[node]=t
            return t
        return f()

    def count_matches(self,needle):
        """Return the count of matches for this needle in the suffix tree"""
        i=0
        node=0
        E=self.E
        T=self.T
        while i<len(needle):
            c=needle[i]
            key=node,c
            if key not in E:
                return 0
            first,last,node = E[key]
            while i<len(needle) and first<=last:
                if needle[i]!=T[first]:
                    return 0
                i+=1
                first+=1
        return self.suffix_counts[node]


big="aaaaaaaaaaabbbbbbbbbcccccccccccddddddddddd"
small_strings=["a","ab","abc"]
s=SuffixTree()
term=chr(0)
s.add(big+term)
s.count_suffixes(term)
for needle in small_strings:
    x=s.count_matches(needle)
    print needle,'has',x,'matches'

它打印:

a has 11 matches 
ab has 1 matches 
abc has 0 matches

但是,在实践中,我建议您简单地使用预先存在的 Aho-Corasick 实现,因为我希望在您的特定情况下这会更快。

于 2013-08-03T17:37:24.370 回答
6

对我来说,匹配大量字符串听起来像http://en.wikipedia.org/wiki/Aho%E2%80%93Corasick_string_matching_algorithm。它确实一次找到一个匹配项,因此如果匹配项数量庞大,Peter de Rivaz 的想法可能会更好。另一方面,Aho-Corasick 不需要将大字符串保留在内存中 - 您可以将其流式传输 - 并且实现和调整非常实用 - 维基百科链接指出原始 fgrep 使用了它。

考虑一下,您可以解决大型匹配问题。Aho-Corasick 可以被视为创建了一个确定性的有限状态机,它能够识别它正在搜索的每个字符串。机器的状态对应于最后看到的 N 个字符。如果您希望匹配两个字符串,其中一个是另一个字符串的后缀,您需要小心,当您处于表示您刚刚匹配了较长字符串的状态时,您也认识到这意味着您刚刚匹配了较短的字符串细绳。如果您故意选择不这样做,那么您为较短字符串累积的计数将是不正确的 - 但您知道它们与看到较长字符串的次数相比太低了。因此,如果您修改 Aho-Corasick 以便仅识别和计算每个点上最长的匹配,那么匹配的成本与您正在搜索的字符串中的字符数保持线性关系,您可以通过遍历长字符串然后增加作为长字符串后缀的较短字符串的计数来修复最后的计数. 这将花费时间最多与正在搜索的字符串的总大小呈线性关系。

于 2013-08-03T17:44:15.497 回答
0

1)我会选择有限自动机。现在想不出一个专门的库,但是通用的 PCRE 可以用来构造一个高效地搜索给定子字符串的自动机。对于子字符串“foo”和“bar”,可以构造一个模式 /(foo)|(bar)/,扫描一个字符串并通过迭代 ovector 检查哪个组匹配来获取匹配子字符串的“id”号。
如果您只需要总数而不是按子字符串分组,RE2::FindAndConsume 就很好。
PS使用 Boost.Xpressive 并从地图加载字符串的示例:http: //ericniebler.com/2010/09/27/boost-xpressive-ftw/
PPS最近我玩得很开心,为类似的任务创建了一台 Ragel 机器。对于一小组搜索到的字符串,“正常”DFA 可以工作,如果你有一个更大的规则集,那么使用 Ragel 扫描仪会显示出很好的结果(这里是一个相关的答案)。
PPPS PCRE 有一个MARK对这种子模式分类非常有用的关键字(cf)。

2)很久以前,我在 Scala 中为这种负载编写了一个基于 Trie 的东西:https ://gist.github.com/ArtemGr/6150594 ;Trie.search 遍历字符串,试图将当前位置与 Trie 中编码的数字相匹配。trie 被编码在一个缓存友好的数组中,我希望它与非 JIT DFA 一样好。

3) 我一直在使用 boost::spirit 进行子字符串匹配,但从未测量过它与其他方法的对比。Spirit 使用了一些高效的结构进行symbols匹配,也许该结构可以单独使用,而无需 Spirit 的开销。

#include <boost/spirit/include/qi.hpp>
#include <boost/spirit/include/phoenix.hpp>
using qi::lit; using qi::alnum; using qi::digit; using qi::_val; using qi::_1; using boost::phoenix::ref;
static struct exact_t: qi::symbols<char, int> {
  exact_t() {add
    ("foo", 1)
    ("bar", 2);}
} exact;
int32_t match = -1;
qi::rule<const char*, int()> rule =
  +alnum >> exact [ref (match) = _1];
const char* it = haystack; // Mutable iterator for Spirit.
qi::parse (it, haystack + haystackLen, rule);
于 2013-08-04T15:11:40.717 回答
0

如果我理解正确,您的输入字符串由许多单字符块组成。

在这种情况下,您可以使用Run-length encoding压缩文本。

例如:

s = aaabbbbcc

被编码为:

encoded_s = (a3)(b4)(c2)

现在您可以在编码文本中搜索模式。

如果你想要一个具体的算法,只需在网络上搜索运行长度编码字符串中匹配的模式。您可以实现时间复杂度O(N + M),其中 N 和 M 是压缩文本压缩模式的长度。M 和 N 通常都比原始长度小得多,因此它优于任何标准模式匹配算法,例如 KMP。

于 2013-08-04T15:24:22.343 回答
0

在另一个答案的基础上(并希望说服您这是最好的答案),您可以查找http://en.wikipedia.org/wiki/Suffix_tree如果您真的想要最快的解决方案,还可以通过那里列出的参考资料了解后缀树,这也可以在不遍历所有匹配项的情况下获得匹配的数量,以及运行时间和内存要求对于任何子字符串匹配或匹配计数算法,你得到的绝对是最好的。一旦您了解了后缀树的工作原理以及如何构建/使用它,那么您唯一需要的额外调整就是存储在树的每个内部节点处表示的不同字符串起始位置的数量,您可以进行一个小的修改通过递归地从子节点获取计数并将它们相加以获得当前节点的计数,可以轻松高效地(线性时间,如前所述)。

于 2013-08-03T19:16:35.053 回答