2

编辑:好的,这是一个更简单的例子来说明我的问题。为什么只有第一个任务被放入队列?

import std.stdio;
import std.parallelism;

void simpleWorker(uint depth, uint maxDepth, TaskPool pool){
    writeln("Depth is: ",depth);
    if (++depth < maxDepth){
        pool.put( task!simpleWorker(depth,maxDepth,pool));
    }
}

void main(){
    auto pool = new TaskPool();
    pool.put(task!simpleWorker(0,5,pool));
    pool.finish(true);
    writeln("Done");
}

原来的:

我需要遍历这个 DAG。当我访问一个节点时,我会清理它。在所有父节点都干净之前,我无法清理节点。

我正在尝试的方法是让工作线程的当前节点检查其所有子节点以查看哪些子节点可以处理。任何可以处理的都添加到任务池中。

我的问题是我不知道如何将新任务添加到 TaskPool 并处理它们。这只是清理 DAG 中的第一个节点,然后退出,让其他一切都变脏。

void cleanNode(Node node, TaskPool pool){
    node.doProcess();
    foreach (client; node.clients){
        if (client.canProcess()){
            pool.put(task!cleanNode(client, pool));
        }
    }
}

void main(){
    auto dag = mkTestDag(5);
    auto pool = new TaskPool();

    pool.put( task!cleanNode(dag[0], pool));
    pool.finish(true); 

    writeln("\n\nOutput:");
    foreach (d;dag){
        writeln(d);
        writeln(d.dirty ? "dirty" : "clean","\n");
    }
}

完整代码在这里: http: //pastebin.com/LLfMyKVp

4

3 回答 3

1

这是因为从里面的 simpleWorker 中抛出了一个错误。

此版本显示错误:

import std.stdio;
import std.parallelism;

void simpleWorker(uint depth, uint maxDepth, TaskPool pool){
    writeln("Depth is: ",depth);
    if (++depth < maxDepth){
        try {
            pool.put( task!simpleWorker(depth,maxDepth,pool));
        } catch (Error e) {
            writeln("Fail: ", e.msg);
        }
    }
}

void main(){
    auto pool = new TaskPool();
    pool.put(task!simpleWorker(0,5,pool));
    pool.finish(true);
    writeln("Done");
}

和输出:

Depth is: 0
Fail: Cannot submit a new task to a pool after calling finish() or stop().
Done

希望其他人可以解释使用TaskPool的正确方法。

编辑

通过告诉任务像这样运行来让它工作:

import std.stdio;
import std.parallelism;

void simpleWorker(uint depth, uint maxDepth, TaskPool pool){
    writeln("Depth is: ",depth);
    if (++depth < maxDepth){
        try 
        {
            auto subWorker = task!simpleWorker(depth,maxDepth, pool);
            pool.put(subWorker);
            subWorker.yieldForce();
        } catch (Error t) {
            writeln("Fail: (",  typeof(t).stringof, ") ", t.msg);
        }
    }
}

void main(){
    auto pool = new TaskPool();

    auto worker = task!simpleWorker(0,5, pool);
    pool.put(worker);
    worker.yieldForce();

    pool.finish(true);
    writeln("Done");
}

输出:

Depth is: 0
Depth is: 1
Depth is: 2
Depth is: 3
Depth is: 4
Done
于 2013-10-24T23:38:58.760 回答
0

要以并行方式迭代,您可以使用parallel

foreach (client; pool.parallel(node.clients,1)){
    if (client.canProcess()){
        cleanNode(client, pool);//<- EDIT: no need to make a task here
    }
}

这将阻塞,直到所有客户端都被迭代(让当前线程也做一些工作)

要开始,您需要执行force第一个任务,main()而不是在池中调用完成

于 2013-10-25T08:20:36.537 回答
0

您有这样的行为是因为您对除根节点以外的所有节点canProcess()都返回 false 。由于调用 canProcess() 来检查是否将新任务放入池中,所以我相信它永远不会这样做......检查这个(修改后的)程序的输出:http: //dpaste.dzfl.pl/ea0c393acleanNodeSimple()

编辑:此外,经过一些分析,循环也只执行一次,因此不会将任何内容添加到任务池中。:)

代码在DPaste上,检查一下。这是可以帮助您解决问题的最有趣的部分:

// Dejan: will be called only once!
// Because canProcess() returns false for all children.
void cleanNodeSimple(Node node, TaskPool pool){
  node.doProcess();
  writeln("cleanNodeSimple() called for node `", node, "`"); 
  foreach (cli; node.clients){
    writeln("I am ", cli, " and I am executed only once, because I am only RootNode's client.");
    if (cli.canProcess()) {
      writeln("I will be executed only once because because ", cli, " has ", cli.clients.length, " clients.");
      writeln("And its master #1 is ", cli.clients[0].masters[0].dirty ? "dirty" : "clean");
      pool.put( task!cleanNodeSimple(cli, pool) );
    }
  }
}
于 2013-10-24T09:14:43.533 回答