我正在使用 gcc 4.7 在 ubuntu 下工作。我正在尝试制作一个网络库来通过 udp 发送数据。
我正在使用 boost::asio::io_service 和多个调用 io_service::run() 的线程来设置线程池。我正在使用 async_receive_from 来等待数据而不阻塞线程。一旦读取了一些数据,就会启动一个新的 async_read,并且接收到的数据由接收它的线程向上传递到我的堆栈。我正在为我从其中接收数据的每个地址创建一个单独的堆栈,并使用 tbb::concurrent_hash_map 在消息向上传递时锁定每个堆栈。您可以在下面看到相关的代码。
现在我的问题是,当我在线程池中仅使用 1 个线程运行时,我收到了大部分数据包,但是当我在池中使用 2 个线程运行时,我丢弃了更多数据包。从这里我正在阅读线程问题,但我无法弄清楚我的问题到底是什么。我的分析器(valgrind --tool=callgrind)中没有显示任何内容。有没有人有任何想法?
public :
/* Virtual functions for sending and receiving messages */
virtual void received(const boost::asio::ip::address &addr, msg_data *const data, const msg_header *const header) override
{
/* Pend the next read */
start_receiving();
/* Check if there exists a parallel stack for this address, if not create it */
typename stack_map::accessor stack_acc;
if (_strands.insert(stack_acc, addr.to_string()))
{
std::cout << "cloning stack: " << addr.to_string() << std::endl;
stack_acc->second = this->_up_node->clean_clone();
}
/* While holding the lock on this stack, propagate the message */
stack_acc->second->received(addr, data, header);
}
/* Start waiting for data on the socket */
virtual void start_receiving() override
{
/* Prepare receive buffer */
char *data_buf = new char [MAX_UDP_SIZE];
std::array<boost::asio::mutable_buffer, 2> recv_buf =
{{
boost::asio::buffer(_head_buf, HEADER_SIZE),
boost::asio::buffer(data_buf, MAX_UDP_SIZE)
}};
/* Wait for data */
_recv_socket.async_receive_from(recv_buf, _recv_endpoint,
[&,data_buf](const boost::system::error_code &, size_t bytes_transferred)
{
received(_recv_endpoint.address(), new msg_data(data_buf, bytes_transferred - HEADER_SIZE), new msg_header(_head_buf));
});
}
private :
typedef tbb::concurrent_hash_map<std::string, stack_component*> stack_map;
stack_map _strands;
编辑: 所以我将我的代码重写为下面的代码,它的性能不再受线程数量的影响。我猜这个问题与跨多个处理器缓存同步数据有关,但我没有任何证据。有没有人有更好的猜测或我可以收集证据的某种方式(最好是免费的方式:))?
/* Start waiting for data on the socket */
virtual void start_receiving() override
{
while (true)
{
/* Prepare receive buffer */
char *data_buf = new char [MAX_UDP_SIZE];
char *head_buf = new char [HEADER_SIZE];
std::array<boost::asio::mutable_buffer, 2> recv_buf =
{{
boost::asio::buffer(head_buf, HEADER_SIZE),
boost::asio::buffer(data_buf, MAX_UDP_SIZE)
}};
/* Wait for data */
size_t bytes_transferred = _recv_socket.receive_from(recv_buf, _recv_endpoint);
_io_service.post([&,head_buf,data_buf,bytes_transferred]()
{
msg_header *header = new msg_header(head_buf);
delete [] head_buf;
received(_recv_endpoint.address(), new msg_data(data_buf, bytes_transferred - HEADER_SIZE), header);
});
}
}
/* Virtual functions for sending and receiving messages */
virtual void received(const boost::asio::ip::address &addr, msg_data *const data, const msg_header *const header) override
{
/* Check if there exists a parallel stack for this address, if not create it */
typename stack_map::accessor stack_acc;
if (_strands.insert(stack_acc, addr.to_string()))
{
stack_acc->second = this->_up_node->clean_clone();
}
/* While holding the lock on this stack, propagate the message */
stack_acc->second->received(addr, data, header);
}