13

我需要快速遍历一棵树,并且我想并行执行。我宁愿使用并行扩展而不是手动启动一堆线程。

我当前的代码如下所示:

   public void Traverse(Node root)
    {
        var nodeQueue = new Queue<Node>();
        nodeQueue.Enqueue(root);
        while (nodeQueue.Count!=0)
        {
            var node = nodeQueue.Dequeue();
            if (node.Property = someValue) DoSomething(node);
            foreach (var node in node.Children)
            {
                nodeQueue.Enqueue(node);
            }
        }
    }

我真的希望 Parallel.ForEach 有一个 Parallel.While 模拟。我看到了 Stephen Toub 的关于使用 Parallel.ForEach 实现 Parallel While的文章。如果正确阅读,这仍然不起作用,因为我正在改变我试图迭代的队列。

我是否需要使用任务工厂和递归(这有风险吗?)?还是有一些我忽略的简单解决方案?

编辑:@svick

这棵树有超过 250,000 个节点。现在的最大深度是 14 个节点,包括根。

离根节点大约有 500 个节点,之后的余额具有相当随机的分布。我很快就会得到一些关于分布的更好的统计数据。

@谜:

是的,树正在被许多用户同时修改,但我通常会为树或子树设置一个共享读锁,或者允许脏读。

对 node.Children 的调用可以被认为是原子的。

DoSomething 实际上是几个委托之一,对于一些昂贵的操作,我可能会收集节点的快照列表并在遍历之外处理它们。

我意识到我可能应该查看一般情况(遍历子树而不是整个树。)为此,我在树的每个节点上运行遍历并查看总时间。

我为每个遍历算法使用了 Parallel.ForEach(nodes, Traverse),其中节点包含所有 ~250k 节点。这模拟(某种程度)许多用户同时请求许多不同的节点。

00256ms 广度优先顺序

00323ms 广度优先顺序与工作(我增加了一个静态计数器作为“工作”)

01495ms 柯克斯第一个答案

01143ms Svicks 第二个答案

00000ms 递归单线程在 60 秒后未完成

00000ms Enigmativity 的答案在 60 秒后没有完成

@Enigma,我认为我可能以某种方式弄乱了您的算法,因为它似乎应该更快。

结果至少可以说让我感到惊讶。我不得不在广度优先顺序上添加一些工作,只是为了让自己相信编译器并没有神奇地优化遍历。

对于头部的单次遍历,并行化第一级只有最好的性能。但几乎没有,随着我在第二级添加更多节点(2000 而不是 500),这个数字有所改善。

4

5 回答 5

8

最直接的方法是Task为每个子节点创建一个,然后等待所有子节点:

public void Traverse(Node root)
{
    if (node.Property == someValue)
        DoSomething(node);

    var tasks = new List<Task>();

    foreach (var node in node.Children)
    {
        // tmp is necessary because of the way closures close over loop variables
        var tmp = node;
        tasks.Add(Task.Factory.StartNew(() => Traverse(tmp)));
    }

    Task.WaitAll(tasks.ToArray());
}

Task是相当轻量级的,因此创建大量它们的效果相当好。但是它们确实有一些开销,所以做一些更复杂的事情,比如让几个任务共享一个队列可能会更快。如果这就是您要走的路,请不要忘记空队列并不意味着所有工作都已完成。如果您采用这种方式,命名空间中的类System.Collections.Concurrent将会派上用场。

编辑:由于树的形状(根有大约 500 个孩子),仅并行处理第一级应该会产生良好的性能:

public void Traverse(Node root, bool parallel = true)
{
    if (node.Property == someValue)
        DoSomething(node);

    if (parallel)
    {
        Parallel.ForEach(node.Children, node =>
        {
            Traverse(node, false);
        });
    }
    else
    {
        foreach (var node in node.Children)
        {
            Traverse(node, false);
        }
    }
}
于 2011-08-17T23:47:25.420 回答
3

由于树的遍历速度非常快,因此调用是原子的,并且需要并行执行Children的委托的昂贵性质,这是我对解决方案的看法。DoSomething

我开始的想法是,我需要一个以节点为参数的函数,创建一个执行的任务DoSomething,递归调用自身为所有子节点创建任务,最后返回一个等待所有内部任务的任务要完成的。

这里是:

Func<Node, Task> createTask = null;
createTask = n =>
{
    var nt = Task.Factory.StartNew(() =>
    {
        if (n.Property == someValue)
            DoSomething(n);
    });
    var nts = (new [] { nt, })
        .Concat(n.Children.Select(cn => createTask(cn)))
        .ToArray();

    return Task.Factory.ContinueWhenAll(nts, ts => { });
};

调用它并等待遍历完成所需的只是:

createTask(root).Wait();

我通过创建一个节点树来测试这一点,它有 500 个子节点,有 14 个级别,每个节点有 1 或 2 个后续子节点。这给了我总共 319,501 个节点。

我创建了一个DoSomething执行一些工作的方法for (var i = 0; i < 100000 ; i++) { };——然后运行上面的代码并将其与串行处理同一棵树进行比较。

并行版本耗时 5,151 毫秒。顺序版本 13,746 毫秒。

我还进行了一项测试,将节点数量减少到 3,196 个,并将处理时间增加了DoSomething100 倍。如果 TPL 的任务快速完成,那么它会非常巧妙地恢复为顺序运行,因此延长处理时间会使代码以更高的并行性运行。

现在并行版本花了 3,203 毫秒。顺序版本耗时 11,581 毫秒。而且,如果我只调用该createTask(root)函数而不等待它完成,它只需要 126 毫秒。这意味着树的遍历速度非常快,因此在遍历期间锁定树并在处理发生时解锁它是有意义的。

我希望这有帮助。

于 2011-08-18T03:15:38.537 回答
3

我可能会遗漏一些东西,但我认为根本不需要 a while。这while只是确保您遍历每个节点。

相反,只需为树中的每个节点递归调用您的函数。

public void Traverse(Node root)
{         
    if (root.Property = someValue) DoSomething(node);    
    Parallel.ForEach<Node>(root.Children, node => Traverse(node));
} 

编辑:当然,如果您更喜欢水平处理而不是垂直处理并且您的昂贵操作是 DoSomething,那么当然可以选择Traverse第一个。

public IEnumerable<Node> Traverse(Node root)
{
    // return all the nodes on this level first, before recurring
    foreach (var node in root.Children)
    {
        if (node.Property == someValue)
            yield return node;
    }

    // next check children of each node
    foreach (var node in root.Children)
    {
        var children = Traverse(node);
        foreach (var child in children)
        {
            yield return child;
        }
    }
}

Parallel.ForEach<Node>(Traverse(n), n => DoSomething(n));
于 2011-08-18T02:16:19.543 回答
1

假设您有p个处理器,也许您对具有p个分区的root.Children执行Parallel.For 。它们中的每一个都将对子树进行传统的单线程遍历、比较,而不是DoSomething ,会将DoSomething的委托排入并发队列。如果分布基本上是随机且平衡的,并且由于遍历只进行遍历/入队,则该部分需要1/p时间。此外,遍历可能会在所有DoSomething执行之前耗尽自身,因此您可以有p个消费者(DoSomething的执行者) 为您提供最大的并行执行,假设所有这些操作都是独立的。

通过对具有随机分布的子树的根子节点的数量进行这种简单的分区,遍历本身将很快。使用每个处理器大致分配的消费者,您还可以获得最大并行DoSomething操作。

于 2011-08-18T04:11:31.783 回答
0

Perhaps using a List or Array instead of queue would help. Also use another List/Array to populate the next nodes to visit. You won't be processing list that until you finish the entire width first anyway. Something like this:

List<Node> todoList = new List<Node>();
todoList.Add(node);
while (todoList.Count > 0)
{
    // we'll be adding next nodes to process to this list so it needs to be thread-safe
    // or just sync access to a non-threadsafe list
    // if you know approx how many nodes you expect, you can pre-size the list
    ThreadSafeList<Node> nextList = new ThreadSafeList<Node>();  

    //todoList is readonly/static so can cache Count in simple variable
    int maxIndex  =  todoList.Count-1;
    // process todoList in parallel
    Parallel.For(0, maxIndex, i =>
    {
        // if list reads are thread-safe then no need to sync, otherwise sync
        Node x = todoList[i];

        //process x;
        // e.g. do somehting, get childrenNodesToWorkOnNext, etc.

        // add any child nodes that need to be processed next
        // e.g. nextList.add(childrenNodesToWorkOnNext);
    });

   // done with parallel processing by here so use the next todo list
   todoList = nextList;
)
于 2013-03-14T23:28:27.267 回答