1

假设以下数据结构具有三个 numpy 数组(id,parent_id)(根元素的 parent_id 为 -1):

import numpy as np
class MyStructure(object):
  def __init__(self):
    """
    Default structure for now:

          1
         / \
        2   3
           / \
          4   5
    """
    self.ids = np.array([1,2,3,4,5])
    self.parent_ids = np.array([-1, 1, 1, 3, 3])

  def id_successors(self, idOfInterest):
    """
    Return logical index.
    """
    return self.parent_ids == idOfInterest

  def subtree(self, newRootElement):
    """
    Return logical index pointing to elements of the subtree.
    """
    init_vector = np.zeros(len(self.ids), bool)
    init_vector[np.where(self.ids==newRootElement)[0]] = 1
    if sum(self.id_successors(newRootElement))==0:
      return init_vector
    else:
      subtree_vec = init_vector
      for sucs in self.ids[self.id_successors(newRootElement)==1]:
        subtree_vec += self.subtree(sucs)
      return subtree_vec

对于许多id (>1000)来说,这真的很慢。有没有更快的方法来实现它?

4

4 回答 4

4

我认为伤害你的不是递归本身,而是每一步都有大量非常广泛的操作(在所有元素上)。考虑:

init_vector[np.where(self.ids==newRootElement)[0]] = 1

它对所有元素进行扫描,计算每个匹配元素的索引,然后仅使用第一个元素的索引。此特定操作可用作列表、元组和数组的方法索引 - 并且速度更快。如果 ID 是唯一的,那么 init_vector 就是 ids==newRootElement 无论如何。

if sum(self.id_successors(newRootElement))==0:

再次对每个元素进行线性扫描,然后对整个数组进行缩减,以检查是否存在任何匹配项。对这种类型的操作使用any,但我们甚至不需要对所有元素进行检查——“如果 newRootElement 不在 self.parent_ids 中”可以完成这项工作,但这不是必需的,因为它是完全有效的 for循环一个空列表。

最后是最后一个循环:

for sucs in self.ids[self.id_successors(newRootElement)==1]:

这一次,重复调用 id_successors,然后将结果不必要地与 1 进行比较。只有在那之后才会出现递归,确保对每个分支重复上述所有操作(针对不同的 newRootElement)。

整个代码是单向树的反向遍历。我们有父母,也需要孩子。如果我们要进行诸如 numpy 设计的广泛操作,我们最好让它们计数 - 因此我们关心的唯一操作是为每个父母建立一个孩子列表。一次迭代并不难做到:

import collections
children=collections.defaultdict(list)
for i,p in zip(ids,parent_ids):
  children[p].append(i)

def subtree(i):
  return i, map(subtree, children[i])

您需要的确切结构将取决于更多因素,例如树更改的频率、树的大小、分支的数量以及您需要请求的子树的大小和数量。例如,上面的字典+列表结构的内存效率不是很高。您的示例也已排序,这可以使操作更加容易。

于 2010-07-29T12:31:04.490 回答
4

如果您使用的是 Python 2.6,您是否尝试过使用 psyco 模块?它有时可以显着加快代码速度。

您是否考虑过递归数据结构:列表?

您的示例也是标准列表:

[1, 2 , [3, [4],[5]]]

或者

[1,[2,无,无],[3,[4,无,无],[5,无,无]]]

通过我漂亮的打印机

[1, 
  [2, None, None], 
  [3, 
    [4, None, None], 
    [5, None, None]]]

子树在那里准备好了,花费你一些时间将值插入右树。还值得检查一下heapq 模块是否符合您的需求。

Guido 本人也在http://python.org/doc/essays/graphs.html中提供了一些关于遍历和树的见解,也许你知道这一点。

这是一些看起来很高级的树的东西,实际上是为 Python 提出的,作为基本的列表类型替换,但在那个函数中被拒绝了。泡罩模块

于 2010-07-28T07:12:14.820 回答
3

理论上,每个算法都可以迭代和递归编写。但这是一个谬误(如图灵完备性)。在实践中,通过迭代遍历任意嵌套的树通常是不可行的。我怀疑还有很多需要优化的地方(至少您正在就地修改 subtree_vec )。对数千个元素执行 x本质上非常昂贵的,无论您是迭代地还是递归地执行它。在具体实现上最多可以进行一些微优化,最多会产生 <5% 的改进。如果您多次需要相同的数据,最好的选择是缓存/记忆。也许有人对您的特定树结构有一个花哨的 O(log n) 算法,我什至不知道是否有可能(我假设没有,但树操作不是)

于 2010-07-28T07:02:57.297 回答
0

这是我的答案(在没有访问您的课程的情况下编写,因此界面略有不同,但我将其按原样附加,以便您可以测试它是否足够快):
=========== ============file graph_array.py==========================


import collections
import numpy

def find_subtree(pids, subtree_id):
    N = len(pids)
    assert 1 <= subtree_id <= N

    subtreeids = numpy.zeros(pids.shape, dtype=bool)
    todo = collections.deque([subtree_id])

    iter = 0
    while todo:
        id = todo.popleft()
        assert 1 <= id <= N
        subtreeids[id - 1] = True

        sons = (pids == id).nonzero()[0] + 1
        #print 'id={0} sons={1} todo={2}'.format(id, sons, todo)
        todo.extend(sons)

        iter = iter+1
        if iter>N:
            raise ValueError()

    return subtreeids

========================file graph_array_test.py======================= ===


import numpy
from graph_array import find_subtree

def _random_graph(n, maxsons):
    import random
    pids = numpy.zeros(n, dtype=int)
    sons = numpy.zeros(n, dtype=int)
    available = []
    for id in xrange(1, n+1):
        if available:
            pid = random.choice(available)

            sons[pid - 1] += 1
            if sons[pid - 1] == maxsons:
                available.remove(pid)
        else:
            pid = -1
        pids[id - 1] = pid
        available.append(id)
    assert sons.max() <= maxsons
    return pids

def verify_subtree(pids, subtree_id, subtree):
    ids = set(subtree.nonzero()[0] + 1)
    sons = set(ids) - set([subtree_id])
    fathers = set(pids[id - 1] for id in sons)
    leafs = set(id for id in ids if not (pids == id).any())
    rest = set(xrange(1, pids.size+1)) - fathers - leafs
    assert fathers & leafs == set()
    assert fathers | leafs == ids
    assert ids & rest == set()

def test_linear_graph_gen(n, genfunc, maxsons):
    assert maxsons == 1
    pids = genfunc(n, maxsons)

    last = -1
    seen = set()
    for _ in xrange(pids.size):
        id = int((pids == last).nonzero()[0]) + 1
        assert id not in seen
        seen.add(id)
        last = id
    assert seen == set(xrange(1, pids.size + 1))

def test_case1():
    """
            1
           / \
          2   4
         /
        3
    """
    pids = numpy.array([-1, 1, 2, 1])

    subtrees = {1: [True, True, True, True],
                2: [False, True, True, False],
                3: [False, False, True, False],
                4: [False, False, False, True]}

    for id in xrange(1, 5):
        sub = find_subtree(pids, id)
        assert (sub == numpy.array(subtrees[id])).all()
        verify_subtree(pids, id, sub)

def test_random(n, genfunc, maxsons):
    pids = genfunc(n, maxsons)
    for subtree_id in numpy.arange(1, n+1):
        subtree = find_subtree(pids, subtree_id)
        verify_subtree(pids, subtree_id, subtree)

def test_timing(n, genfunc, maxsons):
    import time
    pids = genfunc(n, maxsons)
    t = time.time()
    for subtree_id in numpy.arange(1, n+1):
        subtree = find_subtree(pids, subtree_id)
    t = time.time() - t
    print 't={0}s = {1:.2}ms/subtree = {2:.5}ms/subtree/node '.format(
        t, t / n * 1000, t / n**2 * 1000),

def pytest_generate_tests(metafunc):
    if 'case' in metafunc.function.__name__:
        return
    ns = [1, 2, 3, 4, 5, 10, 20, 50, 100, 1000]
    if 'timing' in metafunc.function.__name__:
        ns += [10000, 100000, 1000000]
        pass
    for n in ns:
        func = _random_graph
        for maxsons in sorted(set([1, 2, 3, 4, 5, 10, (n+1)//2, n])):
            metafunc.addcall(
                funcargs=dict(n=n, genfunc=func, maxsons=maxsons),
                id='n={0} {1.__name__}/{2}'.format(n, func, maxsons))
            if 'linear' in metafunc.function.__name__:
                break

===================py.test --tb=short -v -s test_graph_array.py============

...
test_graph_array.py:72: test_timing[n=1000 _random_graph/1] t=13.4850590229s = 13.0ms/subtree = 0.013485ms/subtree/node PASS
test_graph_array.py:72: test_timing[n=1000 _random_graph/2] t=0.318281888962s = 0.32ms/subtree = 0.00031828ms/subtree/node PASS
test_graph_array.py:72: test_timing[n=1000 _random_graph/3] t=0.265519142151s = 0.27ms/subtree = 0.00026552ms/subtree/node PASS
test_graph_array.py:72: test_timing[n=1000 _random_graph/4] t=0.24147105217s = 0.24ms/subtree = 0.00024147ms/subtree/node PASS
test_graph_array.py:72: test_timing[n=1000 _random_graph/5] t=0.211434841156s = 0.21ms/subtree = 0.00021143ms/subtree/node PASS
test_graph_array.py:72: test_timing[n=1000 _random_graph/10] t=0.178458213806s = 0.18ms/subtree = 0.00017846ms/subtree/node PASS
test_graph_array.py:72: test_timing[n=1000 _random_graph/500] t=0.209936141968s = 0.21ms/subtree = 0.00020994ms/subtree/node PASS
test_graph_array.py:72: test_timing[n=1000 _random_graph/1000] t=0.245707988739s = 0.25ms/subtree = 0.00024571ms/subtree/node PASS
...


这里每棵树的每个子树都被提取,有趣的值是提取树的平均时间:每个子树~0.2ms,严格线性树除外。我不确定这里发生了什么。

于 2010-07-29T09:41:47.507 回答