1

我有map<int, queue<int>>一个线程写入它,即将消息推送到队列中。它们的键是指 a client_id,并且队列为客户端保存消息。我希望使这个读写线程安全。

目前,写入它的线程做这样的事情

map<int, queue<int>> msg_map;
if (msg_map.find(client_id) != msg_map.end())
{
    queue<int> dummy_queue;
    dummy_queue.push(msg); //msg is an int
    msg_map.insert(make_pair(client_id, dummy_queue);
}
else
{
    msg_map[client_id].push(msg);
}

有许多客户正在阅读 - 并从这张地图中删除。

if (msg_map.find(client_id) != msg_map.end())
{
    if (!msg_map.find(client_id)->second.empty())
    {
        int msg_rxed = msg_map[client_id].front();

        //processing message

        msg_map[client_id].pop();
    }
}

我正在阅读有关互斥锁的文章(以前没有使用过),我想知道应该在何时何地锁定互斥锁。我的困惑在于他们正在访问单独的队列(保存在同一个地图中)。我是锁定队列还是锁定地图?

是否有标准/公认的方法来做到这一点 - 并且使用互斥锁是最好的方法吗?有 0 个客户端线程,只有 1 个单写线程。

4

2 回答 2

1

简化和优化您的代码

现在我们不关心互斥体,稍后我们会在代码稍微清理一下时处理它(这样会更容易)。

首先,从您展示的代码来看,似乎没有理由使用有序std::map(对数复杂度),您可以使用更高效的std::unordered_map(平均恒定时间复杂度)。选择完全取决于您,如果您不需要订购容器,您只需更改其声明:

std::map<int, std::queue<int>> msg_map;
// or
std::unordered_map<int, std::queue<int>> msg_map; // C++11 only though

现在,地图在设计上非常高效,但是如果您坚持对每项操作进行查找,那么您将失去地图的所有优势。

关于编写器线程,您的所有代码块(对于编写器)都可以仅由这一行有效地替换:

msg_map[client_id].push(msg);

请注意,operator[]对于两者std::mapstd::unordered_map定义为:

使用作为键和默认构造的映射值将新元素插入到容器中key,并返回对新构造的映射值的引用。如果具有key键的元素已经存在,则不执行插入并返回对其映射值的引用。

关于您的阅读器线程,您不能直接使用operator[],因为如果当前不存在特定的条目,它将创建一个新条目,client_id因此,您需要缓存返回的迭代器find以重用它,从而避免无用的查找:

auto iter = msg_map.find(client_id);
// iter will be either std::map<int, std::queue<int>>::iterator
//                  or std::unordered_map<int, std::queue<int>>::iterator
if (iter != msg_map.end()) {
    std::queue<int>& q = iter->second;
    if (!q.empty()) {
        int msg = q.front();
        q.pop();
        // process msg
    }
}

我之所以pop在处理消息之前立即发送消息,是因为当我们添加互斥锁时它会提高并发性(我们可以更快地解锁互斥锁,这总是好的)。

使代码线程安全

@hmjd 关于多个锁(地图一个,每个队列一个)的想法很有趣,但是根据您向我们展示的代码,我不同意:您从额外并发中获得的任何好处很可能会被额外的时间所抵消它需要锁定队列互斥体(实际上,锁定互斥体是一项非常昂贵的操作),更不用说您必须处理的额外代码复杂性。我会把钱押在一个更有效的互斥体上(同时保护地图和所有队列)。

顺便说一句,如果您想使用更高效的方式,单个互斥体可以解决迭代器失效问题std::unordered_mapstd::map尽管不会遇到该问题)。

假设 C++11,只需声明 astd::mutex以及您的地图:

std::mutex msg_map_mutex;
std::map<int, std::queue<int>> msg_map; // or std::unordered_map

保护 writer 线程非常简单,只需在访问 map 之前锁定 mutex:

std::lock_guard<std::mutex> lock(msg_map_mutex);
// the lock is held while the lock_guard object stays in scope
msg_map[client_id].push(msg);

保护阅读器线程几乎没有任何困难,唯一的技巧是您可能希望尽快解锁互斥锁以提高并发性,因此您必须使用std::unique_lock(可以提前解锁)而不是std::lock_guard(只能解锁当它超出范围时):

std::unique_lock<std::mutex> lock(msg_map_mutex);
auto iter = msg_map.find(client_id);
if (iter != msg_map.end()) {
    std::queue<int>& q = iter->second;
    if (!q.empty()) {
        int msg = q.front();
        q.pop();
        // assuming you don't need to access the map from now on, let's unlock
        lock.unlock();
        // process msg, other threads can access the map concurrently
    }
}

如果您不能使用 C++11,则必须替换std::mutexet al。使用您的平台提供的任何东西(pthreads、Win32、...)或使用boost等效项(与特定于平台的原语不同,它具有与新的 C++11 类一样可移植和易于使用的优点)。

于 2013-04-23T11:39:45.657 回答
0

map在修改两个结构时,对和 的读取和写入访问queue同步,包括map

map<int, queue<int>> msg_map;
if (msg_map.find(client_id) != msg_map.end())
{
    queue<int> dummy_queue;
    dummy_queue.push(msg); //msg is an int
    msg_map.insert(make_pair(client_id, dummy_queue);
}
else
{
    msg_map[client_id].push(msg); // Modified here.
}

两个选项是 a mutex,它同时锁定mapand或对 the和 a perqueue具有互斥锁。第二种方法更可取,因为它减少了持有单个锁的时间长度,并且意味着多个线程可以同时更新多个队列。mapmutexqueue

于 2013-04-23T09:46:46.067 回答