在我目前的情况下,速度至关重要,我有一个只能由多个线程读取的地图,而且效果很好。现在出现了一个要求,可能需要在其他线程读取映射时偶尔写入静态映射。我相信这会改变游戏规则,因为我需要锁定我的地图以确保线程安全。这会带来一个问题,因为我有多个线程 10-12 线程将读取地图。如果一张地图在地图上锁定(因为它被读取),我相信锁定是必要的,因为可能会将某些内容写入地图。无论如何,正如我之前所说,如果一个地图正在读取,那么其他地图将不会像之前那样拥有对地图的并行读取访问权限。有什么办法可以规避这个问题吗?
6 回答
您可以使用shared_mutex
地图旁边的 获取共享或唯一访问权限。通常,写操作需要唯一访问,而读操作需要共享访问。
只要没有线程持有唯一访问权限,任意数量的线程都可以获得共享访问权限。如果一个线程试图获取唯一访问,它会等待直到所有共享访问被释放。
标准库和 Boost 提供shared_lock<T>
和unique_lock<T>
用于范围受限的shared_mutex
.
请注意,有些人声称shared_mutex
表现不佳,尽管我没有看到任何证据或强有力的分析来支持这些主张。如果它对你很重要,它可能值得研究。
只是为了您的 c++ 乐趣,阅读这本书,您会发现 WAY 比花费的钱更有价值,您的并发世界将得到广泛的开放
C++-Concurrency in Action Practical Multithreading
本书处理线程数据之间的各种问题和实际解决方案共享、如何唤醒线程、创建线程池等等……更多……还有更多
这里是不使用 atomic 或 shared_locks 在线程之间共享数据的示例
template<class T>
class TaskQueue
{
public:
TaskQueue(){}
TaskQueue& operator = (TaskQueue&) = delete;
void Push(T value){
std::lock_guard<std::mutex> lk(mut);
data.push(value);
condition.notify_one(); //if you have many threads trying to access the data at same time, this will wake one thread only
}
void Get(T& value){
std::unique_lock<std::mutex> lk(mut);
condition.wait(lk, [this]{ return !data.empty(); }); // in this case it waits if queue is empty, if not needed you can remove this line
value = data.front();
data.pop();
lk.unlock();
}
private:
std::mutex mut;
std::queue<T> data; //in your case change this to a std::map
std::condition_variable condition;
};
解决方案之一可能是保留指向该映射的指针,当您需要修改它时 - 制作副本,修改该副本,然后自动将指针交换到新实例。此解决方案会消耗更多内存,但如果您有许多读取线程,则可能会更有效,因为此方法是无锁的。
在下面的例子中,只有一个线程可以修改地图。这并不意味着一次只有一个线程,它意味着在数据结构的整个生命周期中只有一个线程。否则,需要在持有保护整个代码的互斥锁的同时进行修改updateMap
。阅读器线程可以theData
照常访问 - 无需任何锁定。
typedef std::map<...> Data;
std::atomic<Data *> theData;
void updateMap( ... )
{
Data *newData = new Data( *theData );
// modify newData here
Data *old = theData.exchange( newData );
delete old;
}
这是我在不使用 stl 容器的情况下实现的线程安全通用可调整大小哈希图:
#pragma once
#include <iomanip>
#include <exception>
#include <mutex>
#include <condition_variable>
/*
* wrapper for items stored in the map
*/
template<typename K, typename V>
class HashItem {
public:
HashItem(K key, V value) {
this->key = key;
this->value = value;
this->nextItem = nullptr;
}
/*
* copy constructor
*/
HashItem(const HashItem & item) {
this->key = item.getKey();
this->value = item.getValue();
this->nextItem = nullptr;
}
void setNext(HashItem<K, V> * item) {
this->nextItem = item;
}
HashItem * getNext() {
return nextItem;
}
K getKey() {
return key;
}
V getValue() {
return value;
}
void setValue(V value) {
this->value = value;
}
private:
K key;
V value;
HashItem * nextItem;
};
/*
* template HashMap for storing items
* default hash function HF = std::hash<K>
*/
template <typename K, typename V, typename HF = std::hash<K>>
class HashMap {
public:
/*
* constructor
* @mSize specifies the bucket size og the map
*/
HashMap(std::size_t mSize) {
// lock initialization for single thread
std::lock_guard<std::mutex>lock(mtx);
if (mSize < 1)
throw std::exception("Number of buckets ust be greater than zero.");
mapSize = mSize;
numOfItems = 0;
// initialize
hMap = new HashItem<K, V> *[mapSize]();
}
/*
* for simplicity no copy constructor
* anyway we want test how different threads
* use same instance of the map
*/
HashMap(const HashMap & hmap) = delete;
/*
* inserts item
* replaces old value with the new one when item already exists
* @key key of the item
* @value value of the item
*/
void insert(const K & key, const V & value) {
std::lock_guard<std::mutex>lock(mtx);
insertHelper(this->hMap, this->mapSize, numOfItems, key, value);
condVar.notify_all();
}
/*
* erases item with key when siúch item exists
* @key of item to erase
*/
void erase(const K & key) {
std::lock_guard<std::mutex>lock(mtx);
// calculate the bucket where item must be inserted
std::size_t hVal = hashFunc(key) % mapSize;
HashItem<K, V> * prev = nullptr;
HashItem<K, V> * item = hMap[hVal];
while ((item != nullptr) && (item->getKey() != key)) {
prev = item;
item = item->getNext();
}
// no item found with the given key
if (item == nullptr) {
return;
}
else {
if (prev == nullptr) {
// item found is the first item in the bucket
hMap[hVal] = item->getNext();
}
else {
// item found in one of the entries in the bucket
prev->setNext(item->getNext());
}
delete item;
numOfItems--;
}
condVar.notify_all();
}
/*
* get element with the given key by reference
* @key is the key of item that has to be found
* @value is the holder where the value of item with key will be copied
*/
bool getItem(const K & key, V & value) const {
std::lock_guard<std::mutex>lock(mtx);
// calculate the bucket where item must be inserted
std::size_t hVal = hashFunc(key) % mapSize;
HashItem<K, V> * item = hMap[hVal];
while ((item != nullptr) && (item->getKey() != key))
item = item->getNext();
// item not found
if (item == nullptr) {
return false;
}
value = item->getValue();
return true;
}
/*
* get element with the given key by reference
* @key is the key of item that has to be found
* shows an example of thread waitung for some condition
* @value is the holder where the value of item with key will be copied
*/
bool getWithWait(const K & key, V & value) {
std::unique_lock<std::mutex>ulock(mtxForWait);
condVar.wait(ulock, [this] {return !this->empty(); });
// calculate the bucket where item must be inserted
std::size_t hVal = hashFunc(key) % mapSize;
HashItem<K, V> * item = hMap[hVal];
while ((item != nullptr) && (item->getKey() != key))
item = item->getNext();
// item not found
if (item == nullptr) {
return false;
}
value = item->getValue();
return true;
}
/*
* resizes the map
* creates new map on heap
* copies the elements into new map
* @newSize specifies new bucket size
*/
void resize(std::size_t newSize) {
std::lock_guard<std::mutex>lock(mtx);
if (newSize < 1)
throw std::exception("Number of buckets must be greater than zero.");
resizeHelper(newSize);
condVar.notify_all();
}
/*
* outputs all items of the map
*/
void outputMap() const {
std::lock_guard<std::mutex>lock(mtx);
if (numOfItems == 0) {
std::cout << "Map is empty." << std::endl << std::endl;
return;
}
std::cout << "Map contains " << numOfItems << " items." << std::endl;
for (std::size_t i = 0; i < mapSize; i++) {
HashItem<K, V> * item = hMap[i];
while (item != nullptr) {
std::cout << "Bucket: " << std::setw(3) << i << ", key: " << std::setw(3) << item->getKey() << ", value:" << std::setw(3) << item->getValue() << std::endl;
item = item->getNext();
}
}
std::cout << std::endl;
}
/*
* returns true when map has no items
*/
bool empty() const {
std::lock_guard<std::mutex>lock(mtx);
return numOfItems == 0;
}
void clear() {
std::lock_guard<std::mutex>lock(mtx);
deleteMap(hMap, mapSize);
numOfItems = 0;
hMap = new HashItem<K, V> *[mapSize]();
}
/*
* returns number of items stored in the map
*/
std::size_t size() const {
std::lock_guard<std::mutex>lock(mtx);
return numOfItems;
}
/*
* returns number of buckets
*/
std::size_t bucket_count() const {
std::lock_guard<std::mutex>lock(mtx);
return mapSize;
}
/*
* desctructor
*/
~HashMap() {
std::lock_guard<std::mutex>lock(mtx);
deleteMap(hMap, mapSize);
}
private:
std::size_t mapSize;
std::size_t numOfItems;
HF hashFunc;
HashItem<K, V> ** hMap;
mutable std::mutex mtx;
mutable std::mutex mtxForWait;
std::condition_variable condVar;
/*
* help method for inserting key, value item into the map hm
* mapSize specifies the size of the map, items - the number
* of stored items, will be incremented when insertion is completed
* @hm HashMap
* @mSize specifies number of buckets
* @items holds the number of items in hm, will be incremented when insertion successful
* @key - key of item to insert
* @value - value of item to insert
*/
void insertHelper(HashItem<K, V> ** hm, const std::size_t & mSize, std::size_t & items, const K & key, const V & value) {
std::size_t hVal = hashFunc(key) % mSize;
HashItem<K, V> * prev = nullptr;
HashItem<K, V> * item = hm[hVal];
while ((item != nullptr) && (item->getKey() != key)) {
prev = item;
item = item->getNext();
}
// inserting new item
if (item == nullptr) {
item = new HashItem<K, V>(key, value);
items++;
if (prev == nullptr) {
// insert new value as first item in the bucket
hm[hVal] = item;
}
else {
// append new item on previous in the same bucket
prev->setNext(item);
}
}
else {
// replace existing value
item->setValue(value);
}
}
/*
* help method to resize the map
* @newSize specifies new number of buckets
*/
void resizeHelper(std::size_t newSize) {
HashItem<K, V> ** newMap = new HashItem<K, V> *[newSize]();
std::size_t items = 0;
for (std::size_t i = 0; i < mapSize; i++) {
HashItem<K, V> * item = hMap[i];
while (item != nullptr) {
insertHelper(newMap, newSize, items, item->getKey(), item->getValue());
item = item->getNext();
}
}
deleteMap(hMap, mapSize);
hMap = newMap;
mapSize = newSize;
numOfItems = items;
newMap = nullptr;
}
/*
* help function for deleting the map hm
* @hm HashMap
* @mSize number of buckets in hm
*/
void deleteMap(HashItem<K, V> ** hm, std::size_t mSize) {
// delete all nodes
for (std::size_t i = 0; i < mSize; ++i) {
HashItem<K, V> * item = hm[i];
while (item != nullptr) {
HashItem<K, V> * prev = item;
item = item->getNext();
delete prev;
}
hm[i] = nullptr;
}
// delete the map
delete[] hm;
}
};
其他两个答案都很好,但我认为我应该添加一点颜色:
Cliff Click 用 Java 编写了一个无锁并发散列映射。 让它适应 C++(没有 GC,不同的内存模型等)将是不平凡的,但它是我见过的无锁数据结构的最佳实现。如果您可以使用 JAva 而不是 C++,这可能是要走的路。
我不知道任何无锁平衡二叉树结构。这并不意味着它们不存在。
使用其他两个答案之一(批量复制/原子交换/类似shared_ptr
或读写器锁定)来控制对它的访问可能是最简单的map
。根据读写的相对数量和map
; 的大小,两者之一会更快。您应该进行基准测试以查看应该使用哪一个。
您需要的是等效于 Java 中的 ConcurrentHashMap,它允许并发读取和写入底层哈希表。此类是 java.util.concurrent 包的一部分,提供并发读写(最高并发级别,默认为 16)。
您可以在javadoc中找到更多信息。我在这里引用javadoc:
支持检索的完全并发和可调整的预期更新并发的哈希表。此类遵循与 Hashtable 相同的功能规范,并包含与 Hashtable 的每个方法对应的方法版本。但是,即使所有操作都是线程安全的,检索操作也不需要锁定,并且不支持以阻止所有访问的方式锁定整个表。在依赖线程安全但不依赖同步细节的程序中,此类与 Hashtable 完全可互操作。