9

假设我有一个多线程 C++ 程序,它以函数调用的形式处理请求handleRequest(string key)。每次调用都handleRequest发生在一个单独的线程中,并且有任意大量的可能值key

我想要以下行为:

  • 当对 的同时调用handleRequest(key)具有相同的值时,它们将被序列化key
  • 全局序列化被最小化。

的主体handleRequest可能如下所示:

void handleRequest(string key) {
    KeyLock lock(key);
    // Handle the request.
}

问题:我将如何实现KeyLock以获得所需的行为?

一个天真的实现可能会像这样开始:

KeyLock::KeyLock(string key) {
    global_lock->Lock();
    internal_lock_ = global_key_map[key];
    if (internal_lock_  == NULL) {
        internal_lock_  = new Lock();
        global_key_map[key] = internal_lock_;
    }
    global_lock->Unlock();
    internal_lock_->Lock();
}

KeyLock::~KeyLock() {
    internal_lock_->Unlock();
    // Remove internal_lock_ from global_key_map iff no other threads are waiting for it.
}

...但这需要在每个请求的开始和结束时使用全局锁,并为每个请求创建一个单独的Lock对象。如果对 的调用之间的争用很高handleRequest,那可能不是问题,但如果争用很低,它可能会带来很多开销。

4

6 回答 6

12

你可以做一些类似于你在你的问题中所做的事情,但不是单个 global_key_map 有几个(可能在数组或向量中) - 使用哪个是由字符串上的一些简单哈希函数确定的。

这样,您可以将其分散到几个独立的锁上,而不是单个全局锁。

这是内存分配器中经常使用的一种模式(我不知道该模式是否有名字——应该有)。当一个请求进来时,一些东西决定了分配将来自哪个池(通常是请求的大小,但其他参数也可以考虑在内),然后只需要锁定那个池。如果分配请求来自另一个将使用不同池的线程,则不会出现锁争用。

于 2008-10-03T18:38:17.573 回答
2

这将取决于平台,但我尝试的两种技术是:

  • 使用命名互斥/同步对象,其中对象名称 = 键
  • 使用基于文件系统的锁定,尝试使用密钥名称创建不可共享的临时文件。如果它已经存在(=已经锁定),这将失败,您必须轮询重试

这两种技术都取决于您的操作系统的细节。试验一下,看看哪个有效。.

于 2008-10-03T18:39:14.467 回答
2

也许 anstd::map<std::string, MutexType>就是你想要的,你想要MutexType的互斥锁的类型在哪里。您可能必须将对映射的访问包装在另一个互斥锁中,以确保没有其他线程同时插入(并记住在互斥锁锁定后再次执行检查以确保另一个线程没有添加等待互斥锁时的键!)。

相同的原则可以适用于任何其他同步方法,例如临界区。

于 2008-10-03T18:39:45.573 回答
2

提高粒度并锁定整个键范围

这是迈克 B 答案的一个变体,其中没有几个流体锁映射,而是有一个固定的锁阵列,适用于键范围而不是单个键。

简化示例:在启动时创建 256 个锁的数组,然后使用 key 的第一个字节来确定要获取的锁的索引(即所有以 'k' 开头的 key 将由 保护locks[107])。

为了维持最佳吞吐量,您应该分析密钥的分布和争用率。这种方法的好处是零动态分配和简单的清理;您还可以避免两步锁定。不利的一面是,如果密钥分配随着时间的推移而出现偏差,则可能会出现争用高峰。

于 2008-10-03T22:18:07.410 回答
0

想了想,另一种方法可能是这样的:

  • handleRequest中,创建一个Callback执行实际工作的。
  • 创建一个multimap<string, Callback*> global_key_map, 受互斥体保护。
  • 如果一个线程看到它key已经被处理,它将把它添加Callback*global_key_map并返回。
  • 否则,它会立即调用其回调,然后调用同时出现的同一个键的回调。

实现了这样的事情:

LockAndCall(string key, Callback* callback) {
    global_lock.Lock();
    if (global_key_map.contains(key)) {
        iterator iter = global_key_map.insert(key, callback);
        while (true) {
            global_lock.Unlock();
            iter->second->Call();
            global_lock.Lock();
            global_key_map.erase(iter);
            iter = global_key_map.find(key);
            if (iter == global_key_map.end()) {
                global_lock.Unlock();
                return;
            }
        }
    } else {
        global_key_map.insert(key, callback);
        global_lock.Unlock();
    }
}

这样做的好处是可以释放等待键锁的线程,但除此之外,它与我在问题中发布的幼稚解决方案几乎相同。

不过,它可以与 Mike B 和 Constantin 给出的答案结合起来。

于 2008-10-03T22:47:16.133 回答
0
      /**
      * StringLock class for string based locking mechanism
      * e.g. usage
      *     StringLock strLock;
      *     strLock.Lock("row1");
      *     strLock.UnLock("row1");
      */
      class StringLock    {
      public:
          /**
           * Constructor
           * Initializes the mutexes
           */
          StringLock()    {
              pthread_mutex_init(&mtxGlobal, NULL);
          }
          /**
           * Lock Function
           * The thread will return immediately if the string is not locked
           * The thread will wait if the string is locked until it gets a turn
           * @param string the string to lock
           */
          void Lock(string lockString)    {
              pthread_mutex_lock(&mtxGlobal);
              TListIds *listId = NULL;
              TWaiter *wtr = new TWaiter;
              wtr->evPtr = NULL;
              wtr->threadId = pthread_self();
              if (lockMap.find(lockString) == lockMap.end())    {
                  listId = new TListIds();
                  listId->insert(listId->end(), wtr);
                  lockMap[lockString] = listId;
                  pthread_mutex_unlock(&mtxGlobal);
              } else    {
                  wtr->evPtr = new Event(false);
                  listId = lockMap[lockString];
                  listId->insert(listId->end(), wtr);
                  pthread_mutex_unlock(&mtxGlobal);
                  wtr->evPtr->Wait();
              }
          }
          /**
          * UnLock Function
          * @param string the string to unlock
          */
          void UnLock(string lockString)    {
              pthread_mutex_lock(&mtxGlobal);
              TListIds *listID = NULL;
              if (lockMap.find(lockString) != lockMap.end())    {
                  lockMap[lockString]->pop_front();
                  listID = lockMap[lockString];
                  if (!(listID->empty()))    {
                      TWaiter *wtr = listID->front();
                      Event *thdEvent = wtr->evPtr;
                      thdEvent->Signal();
                  } else    {
                      lockMap.erase(lockString);
                      delete listID;
                  }
              }
              pthread_mutex_unlock(&mtxGlobal);
          }
      protected:
          struct TWaiter    {
              Event *evPtr;
              long threadId;
          };
          StringLock(StringLock &);
          void operator=(StringLock&);
          typedef list TListIds;
          typedef map TMapLockHolders;
          typedef map TMapLockWaiters;
      private:
          pthread_mutex_t mtxGlobal;
          TMapLockWaiters lockMap;
      };
于 2009-01-05T03:57:42.973 回答