1

我正在使用 gcc 4.7 在 ubuntu 下工作。我正在尝试制作一个网络库来通过 udp 发送数据。

我正在使用 boost::asio::io_service 和多个调用 io_service::run() 的线程来设置线程池。我正在使用 async_receive_from 来等待数据而不阻塞线程。一旦读取了一些数据,就会启动一个新的 async_read,并且接收到的数据由接收它的线程向上传递到我的堆栈。我正在为我从其中接收数据的每个地址创建一个单独的堆栈,并使用 tbb::concurrent_hash_map 在消息向上传递时锁定每个堆栈。您可以在下面看到相关的代码。

现在我的问题是,当我在线程池中仅使用 1 个线程运行时,我收到了大部分数据包,但是当我在池中使用 2 个线程运行时,我丢弃了更多数据包。从这里我正在阅读线程问题,但我无法弄清楚我的问题到底是什么。我的分析器(valgrind --tool=callgrind)中没有显示任何内容。有没有人有任何想法?

public :    
    /* Virtual functions for sending and receiving messages */
    virtual void received(const boost::asio::ip::address &addr, msg_data *const data, const msg_header *const header) override
    {
        /* Pend the next read */
        start_receiving();

        /* Check if there exists a parallel stack for this address, if not create it */
        typename stack_map::accessor stack_acc;
        if (_strands.insert(stack_acc, addr.to_string()))
        {
            std::cout << "cloning stack: " << addr.to_string() << std::endl;
            stack_acc->second = this->_up_node->clean_clone();
        }

        /* While holding the lock on this stack, propagate the message */
        stack_acc->second->received(addr, data, header);
    }

    /* Start waiting for data on the socket */
    virtual void start_receiving() override
    {
        /* Prepare receive buffer */
        char *data_buf = new char [MAX_UDP_SIZE];
        std::array<boost::asio::mutable_buffer, 2> recv_buf =
        {{
            boost::asio::buffer(_head_buf, HEADER_SIZE),
            boost::asio::buffer(data_buf, MAX_UDP_SIZE)
        }};

        /* Wait for data */
        _recv_socket.async_receive_from(recv_buf, _recv_endpoint,
            [&,data_buf](const boost::system::error_code &, size_t bytes_transferred)
            {
                received(_recv_endpoint.address(), new msg_data(data_buf, bytes_transferred - HEADER_SIZE), new msg_header(_head_buf));
            });
    }
private :
    typedef tbb::concurrent_hash_map<std::string, stack_component*> stack_map;
    stack_map _strands;

编辑: 所以我将我的代码重写为下面的代码,它的性能不再受线程数量的影响。我猜这个问题与跨多个处理器缓存同步数据有关,但我没有任何证据。有没有人有更好的猜测或我可以收集证据的某种方式(最好是免费的方式:))?

    /* Start waiting for data on the socket */
    virtual void start_receiving() override
    {
        while (true)
        {
            /* Prepare receive buffer */
            char *data_buf = new char [MAX_UDP_SIZE];
            char *head_buf = new char [HEADER_SIZE];
            std::array<boost::asio::mutable_buffer, 2> recv_buf =
            {{
                boost::asio::buffer(head_buf, HEADER_SIZE),
                boost::asio::buffer(data_buf, MAX_UDP_SIZE)
            }};

            /* Wait for data */
            size_t bytes_transferred = _recv_socket.receive_from(recv_buf, _recv_endpoint);

            _io_service.post([&,head_buf,data_buf,bytes_transferred]()
                {
                    msg_header *header = new msg_header(head_buf);
                    delete [] head_buf;
                    received(_recv_endpoint.address(), new msg_data(data_buf, bytes_transferred - HEADER_SIZE), header);
                });
        }
    }

    /* Virtual functions for sending and receiving messages */
    virtual void received(const boost::asio::ip::address &addr, msg_data *const data, const msg_header *const header) override
    {
        /* Check if there exists a parallel stack for this address, if not create it */
        typename stack_map::accessor stack_acc;
        if (_strands.insert(stack_acc, addr.to_string()))
        {
            stack_acc->second = this->_up_node->clean_clone();
        }

        /* While holding the lock on this stack, propagate the message */
        stack_acc->second->received(addr, data, header);
    }
4

0 回答 0