0

简而言之,我有一个提供 shared_ptr 类型的数据源。这个指针似乎直接超出了 source_node 的运算符重载的范围。我添加了一个完全简化的示例来演示该问题。

我的问题:什么是优雅的为什么要克服这个问题?

#include <exception>
#include <tbb/concurrent_queue.h>
#include <tbb/flow_graph.h>
#include <condition_variable>
#include <thread>
#include <chrono>
#include <memory>
#include <iostream>

using namespace tbb::flow;

// Plain Data Object
class Data
{
public:
    size_t _data;
    Data(size_t d) : _data(d){}

};


// simpel counting node
class CountingNode
{
private:
    size_t _count;
public:
    CountingNode():_count(0){}
    size_t operator()(std::shared_ptr<Data> data)
    {
        if (static_cast<bool>(data)) // <-- PROBLEM always false
            _count += data->_data;

        std::cout << _count << std::endl;
        return _count;
    }
    size_t count() { return _count;}
};

// Source,
// exhausted when a empty/null shared_ptr is encountered
// our datasource provides a shared_ptr to a data object
class SourceNode
{
public:

    std::shared_ptr<tbb::concurrent_bounded_queue<std::shared_ptr<Data>>> _queue;
    std::shared_ptr<std::mutex> _mtx;
    bool _started;

    SourceNode() : _started(false)
    {
        _queue = std::shared_ptr<tbb::concurrent_bounded_queue<std::shared_ptr<Data>>>(new tbb::concurrent_bounded_queue<std::shared_ptr<Data>>());
        _mtx = std::shared_ptr<std::mutex>(new std::mutex());
    }

    SourceNode(const SourceNode& other)
    {
        _queue = other._queue;
        _mtx = other._mtx;
        _started = other._started;
    }

    void push(std::shared_ptr<Data> data)
    {
        _queue->push(data);
    }

    bool operator ()(std::shared_ptr<Data> data)
    {
        std::unique_lock<std::mutex> ul(*_mtx);
        {
            if (!_started)
            {
                _started = true;
                for(size_t idx = 0; idx != 10; ++idx)
                    _queue->push(std::shared_ptr<Data>(new Data(idx)));

                _queue->push(std::shared_ptr<Data>()); // no more data
            }
        }

        _queue->pop(data);
        return static_cast<bool>(data);
    }

    void close()
    {
        _queue->push(std::shared_ptr<Data>());
    }
};




int main(int argc, char* argv[])
{
    graph g;
    source_node<std::shared_ptr<Data>> source(g,SourceNode(),false);
    function_node<std::shared_ptr<Data>,int> sink(g,tbb::flow::serial, CountingNode());
    make_edge(source, sink );

    source.activate();
    g.wait_for_all();

    return 0;
}

如果需要提供更多详细信息,请告诉我。

亲切的问候 Auke-Dirk

在此处输入图像描述

4

1 回答 1

1

您应该接受通过引用data传递的参数SourceNode以将更新的值返回给调用者:

class SourceNode
{
    ...
    bool operator ()(std::shared_ptr<Data>& data)
    {
        ...
        _queue->pop(data);
        return static_cast<bool>(data);
    }
}

否则,如果由操作员更新,则只有参数的本地副本,data并且对默认构造(空)值执行进一步的操作。

于 2015-05-27T12:11:18.327 回答