4

我目前正在测试 tbb 的流程图功能。为了使用它,我必须能够中止图中某个节点的执行,包括所有依赖它的子节点,但让其他不依赖它的子节点执行。从主体中抛出异常或调用 task::cancel_group_execution() 会中止所有节点的执行。

#include <cstdio>
#include "tbb/flow_graph.h"

using namespace tbb::flow;

struct body
{   std::string my_name;
    body( const char *name ) : my_name(name)
    {
    }
    void operator()( continue_msg ) const
    {   if (my_name == "B")
            tbb::task::self().group()->cancel_group_execution();
        else
        {   sleep(1);
            printf("%s\n", my_name.c_str());
        }
    }
};

int main()
{
    graph g;

    broadcast_node< continue_msg > start(g);
    continue_node<continue_msg> a( g, body("A"));
    continue_node<continue_msg> b( g, body("B"));
    continue_node<continue_msg> c( g, body("C"));
    continue_node<continue_msg> d( g, body("D"));
    continue_node<continue_msg> e( g, body("E"));

    make_edge( start, a );
    make_edge( start, b );
    make_edge( a, c );
    make_edge( b, c );
    make_edge( c, d );
    make_edge( a, e );

    for (int i = 0; i < 3; ++i )
        try
        {   start.try_put( continue_msg() );
            g.wait_for_all();
        } catch (...)
        {   printf("Caught exception\n");
        }
    return 0;
}
4

2 回答 2

5

如果您希望能够取消部分图表的执行,则需要使用 task_group_contexts。添加以下内容:

#include "tbb/task.h"

并将您的主程序更改为以下内容:

int main()
{
    tbb::task_group_context tgc1;
    tbb::task_group_context tgc2;
    graph g1(tgc1);
    graph g2(tgc2);
    printf("Constructing graph\n");
    broadcast_node< continue_msg > start(g1);
    continue_node<continue_msg> a( g1, body("A"));
    continue_node<continue_msg> b( g2, body("B"));
    continue_node<continue_msg> c( g2, body("C"));
    continue_node<continue_msg> d( g2, body("D"));
    continue_node<continue_msg> e( g1, body("E"));

    make_edge( start, a );
    make_edge( start, b );
    make_edge( a, c );
    make_edge( b, c );
    make_edge( c, d );
    make_edge( a, e );

    for (int i = 0; i < 3; ++i ) {
        try
        {   
            printf("broadcasting graph %d\n", i);
            start.try_put( continue_msg() );
            g1.wait_for_all();
            g2.wait_for_all();
        } catch (...)
        {   printf("Caught exception\n");
        }
        g1.wait_for_all();
        g1.reset();
        g2.reset();
    }
    return 0;
}

每个 task_group_context 都是(默认)父上下文的子上下文。取消 g2 不会影响 g1。如果 B 抛出而不是取消,您的 catch 将确保异常不会传递给父级。如果您没有捕获异常,父上下文也将被取消,A 和 E 的上下文也将被取消。

具有多个 task_group_contexts 的图

请注意,您必须等待两个图表完成。您还必须reset()使用图表来重置continue_nodes' 计数器。实际上,在抛出并捕获异常的情况下,并不能保证 g1 完成后 g1catch(...)完成,因此需要g1.wait_for_all()try/catch. 我编辑了代码以显示这一点。

multifunction_node您可以使用 a的输入continue_msg和 a 的单个输出来制作 B a ,而不是使用取消来停止部分计算continue_msg

typedef multifunction_node<continue_msg, tuple<continue_msg> > mf_type;

struct mf_body {
    std::string my_name;
    mf_body(const char *name) : my_name(name) {}
    void operator()(continue_msg, mf_type::output_ports_type &op) {
        if(my_name == "B") {
            printf("%s returning without sending\n", my_name.c_str());
            return;
        }
        sleep(1);
        get<0>(op).try_put(continue_msg());
        return;
    }
};

然后创建节点 B:

mf_type b( g, unlimited, mf_body("B"));

并且从 B 到 C 的边缘将设置如下:

make_edge( output_port<0>(b), c ); 

在这种情况下,您不需要将图拆分为两个子图。如果节点 B 已经取消,则它会返回而不将 a 转发continue_msg给其后继者。如果节点 B 不转发消息,节点 C 将不会执行,因为它需要两个continue_msgs启动。之后您仍然需要重置图表,以重置 C 的计数。

multifunction_node优点是您可以选择转发或不转发消息。这里需要注意的是,multifunction_node带有continue_msg输入的 a 不像 a continue_nodecontinue_node需要尽可能多的continue_msgs前辈(加上构造时的初始化值)。multifunction_node主体在收到 a 时执行continue_msg,无论它有多少前辈。因此,对于您的图表,您不能只制作所有节点multifunction_nodes

于 2014-01-16T17:37:47.840 回答
2

bool您可以用代替来表示中止状态continue_msg。每个process_node接收前驱节点状态并在可用时处理任务,并将更新的中止状态发送到后继节点。

struct body
{   std::string my_name;
    body( const char *name ) : my_name(name)
    {
    }
    bool operator()( bool avail ) const
    {   if (!avail)
           printf("%s skipped\n", my_name.c_str());
        else
            if (my_name == "B")
            {   printf("%s fail\n", my_name.c_str());
                avail = false;  // fail task
            }
            else
            {   sleep(1);
                printf("%s\n", my_name.c_str());
            }
        return avail;
    }
};

int main()
{
    graph g;

    typedef function_node<bool, bool> process_node;
    typedef std::tuple<bool,bool> bool_pair;
    broadcast_node< bool > start(g);
    process_node a( g, unlimited, body("A"));
    process_node b( g, unlimited, body("B"));
    process_node c( g, unlimited, body("C"));
    join_node<bool_pair> join_c(g);
    function_node<bool_pair, bool> and_c(g, unlimited, [](const bool_pair& in)->bool {
        return std::get<0>(in) && std::get<1>(in);
    });
    process_node d( g, unlimited, body("D"));
    process_node e( g, unlimited, body("E"));

    /*
     * start -+-> A -+-> E
     *        |       \
     *        |        \
     *        |         join_c -> and_c -> C -> D
     *        |        /
     *        |       /
     *        +-> B -- 
     */
    make_edge( start, a );
    make_edge( start, b );
    make_edge( a, input_port<0>(join_c) );
    make_edge( b, input_port<1>(join_c) );
    make_edge( join_c, and_c );
    make_edge( and_c, c );
    make_edge( c, d );
    make_edge( a, e );

    for (int i = 0; i < 3; ++i )
        try
        {   start.try_put( true );
            g.wait_for_all();
        } catch (...)
        {   printf("Caught exception\n");
        }
    return 0;
}
于 2014-01-17T00:50:19.640 回答