36

首先:我完全是互斥锁/多线程编程的新手,很抱歉提前出现任何错误......

我有一个运行多个线程的程序。线程(通常每个 cpu 核心一个)进行大量计算和“思考”,然后有时他们决定调用特定(共享)方法来更新一些统计信息。统计更新的并发性是通过使用互斥锁来管理的:

stats_mutex.lock();
common_area->update_thread_stats( ... );
stats_mutex.unlock();

现在来解决问题。在所有这些线程中,有一个特定的线程需要几乎
实时的优先级,因为它是唯一实际运行的线程。

我的意思是“几乎实时优先”:

假设线程 t0 是“特权线程”,而 t1....t15 是正常线程。现在发生的是:

  • 线程 t1 获得锁。
  • 线程 t2, t3, t0 调用 lock() 方法并等待它成功。
  • 线程 t1 调用 unlock()
  • 线程 t2、t3、t0 中的一个(据我所知是随机的)成功获取锁,而其他线程继续等待。

我需要的是:

  • 线程 t1 获取锁。
  • 线程 t2, t3, t0 调用 lock() 方法并等待它成功。
  • 线程 t1 调用 unlock()
  • 线程 t0 获得锁,因为它有特权

那么,做这件事最好的(可能是最简单的)方法是什么?

我在想的是有一个名为“privileged_needs_lock”的布尔变量。

但我认为我需要另一个互斥锁来管理对这个变量的访问......我不知道这是否是正确的方法......

附加信息:

  • 我的线程使用 C++11(从 gcc 4.6.3 开始)
  • 代码需要在 Linux 和 Windows 上运行(但目前仅在 Linux 上测试)。
  • 锁定机制的性能不是问题(我的性能问题是内部线程计算,线程数总是很低,每个 cpu 核心最多一到两个)

任何想法都值得赞赏。谢谢


以下解决方案有效(三种互斥方式):

#include <thread>
#include <iostream>
#include <mutex>
#include "unistd.h"

std::mutex M;
std::mutex N;
std::mutex L;

void lowpriolock(){
  L.lock();
  N.lock();
  M.lock();
  N.unlock();
}

void lowpriounlock(){
  M.unlock();
  L.unlock();
}

void highpriolock(){
  N.lock();
  M.lock();
  N.unlock();
}

void highpriounlock(){
  M.unlock();
}

void hpt(const char* s){
  using namespace std;
  //cout << "hpt trying to get lock here" << endl;
  highpriolock();
  cout << s << endl;
  sleep(2);
  highpriounlock();
}

void lpt(const char* s){
  using namespace std;
  //cout << "lpt trying to get lock here" << endl;
  lowpriolock();
  cout << s << endl;
  sleep(2);
  lowpriounlock();
}

int main(){
std::thread t0(lpt,"low prio t0 working here");
std::thread t1(lpt,"low prio t1 working here");
std::thread t2(hpt,"high prio t2 working here");
std::thread t3(lpt,"low prio t3 working here");
std::thread t4(lpt,"low prio t4 working here");
std::thread t5(lpt,"low prio t5 working here");
std::thread t6(lpt,"low prio t6 working here");
std::thread t7(lpt,"low prio t7 working here");
//std::cout << "All threads created" << std::endl;
t0.join();
t1.join();
t2.join();
t3.join();
t4.join();
t5.join();
t6.join();
t7.join();
return 0;
}

按照建议尝试了以下解决方案,但它不起作用(使用“ g++ -std=c++0x -o test test.cpp -lpthread”编译):

#include <thread>
#include <mutex>

#include "time.h"
#include "pthread.h"

std::mutex l;

void waiter(){
  l.lock();
  printf("Here i am, waiter starts\n");
  sleep(2);
  printf("Here i am, waiter ends\n");
  l.unlock();
}

void privileged(int id){
  usleep(200000);
  l.lock();
  usleep(200000);
  printf("Here i am, privileged (%d)\n",id);
  l.unlock();  
}

void normal(int id){
  usleep(200000);
  l.lock();
  usleep(200000);
  printf("Here i am, normal (%d)\n",id);
  l.unlock();    
}

int main(){
  std::thread tw(waiter);
  std::thread t1(normal,1);
  std::thread t0(privileged,0);
  std::thread t2(normal,2);

  sched_param sch;
  int policy; 

  pthread_getschedparam(t0.native_handle(), &policy, &sch);
  sch.sched_priority = -19;
  pthread_setschedparam(t0.native_handle(), SCHED_FIFO, &sch);

  pthread_getschedparam(t1.native_handle(), &policy, &sch);
  sch.sched_priority = 18;
  pthread_setschedparam(t1.native_handle(), SCHED_FIFO, &sch);

  pthread_getschedparam(t2.native_handle(), &policy, &sch);
  sch.sched_priority = 18;
  pthread_setschedparam(t2.native_handle(), SCHED_FIFO, &sch);
  
  tw.join();
  t1.join();
  t0.join();
  t2.join();

  return 0;  
}
4

8 回答 8

52

我可以想到三种仅使用线程原语的方法:

三重互斥锁

三个互斥锁可以在这里工作:

  • 数据互斥锁 ('M')
  • 下一个访问互斥锁 ('N'),以及
  • 低优先级访问互斥锁 ('L')

访问模式是:

  • 低优先级线程:lock L, lock N, lock M, unlock N, { do stuff }, unlock M, unlock L
  • 高优先级线程:lock N, lock M, unlock N, { do stuff }, unlock M

这样就可以保护对数据的访问,并且高优先级线程可以在访问数据方面领先于低优先级线程。

互斥体、条件变量、原子标志

执行此操作的原始方法是使用条件变量和原子:

  • 互斥体 M;
  • 康德瓦 C;
  • 原子布尔 hpt_waiting;

数据访问模式:

  • 低优先级线程:lock M, while (hpt_waiting) wait C on M, { do stuff }, 广播 C, unlock M
  • 高优先级线程: hpt_waiting := true, lock M, hpt_waiting := false, { do stuff }, 广播 C, unlock M

互斥体、条件变量、两个非原子标志

或者,您可以将两个非原子布尔值与 condvar 一起使用;在这种技术中,mutex/condvar 保护标志,并且数据不是由互斥锁而是由标志保护的:

  • 互斥体 M;

  • 康德瓦 C;

  • bool data_held, hpt_waiting;

  • 低优先级线程:lock M, while (hpt_waiting or data_held) wait C on M, data_held := true, unlock M, { do stuff }, lock M, data_held := false, broadcast C, unlock M

  • 高优先级线程:lock M, hpt_waiting := true, while (data_held) wait C on M, data_held := true, unlock M, { do stuff }, lock M, data_held := false, hpt_waiting := false, 广播 C , 解锁 M

于 2012-07-26T16:15:54.163 回答
8

将请求线程放在“优先队列”上。特权线程可以在空闲时首先获取数据。

一种方法是使用 ConcurrentQueues[privilegeLevel] 数组、一个锁和一些事件。

任何想要获取数据的线程都会进入锁。如果数据是空闲的,(布尔值),它会获取数据对象并退出锁。如果数据正在被另一个线程使用,请求线程会根据其特权级别将事件推送到并发队列中,退出锁并等待事件。

当一个线程想要释放它对数据对象的所有权时,它会获得锁并从最高权限的一端向下迭代 ConcurrentQueues 数组,寻找一个事件(即队列计数>0)。如果它找到一个,它发出信号并退出锁,如果没有,它设置'dataFree'布尔值并退出锁。

当等待事件以访问数据的线程准备就绪时,它可以访问数据对象。

我认为这应该可行。请其他开发人员检查此设计,看看您是否可以想到任何种族等?在去 CZ 之后,我仍然有些“热情好客”的痛苦。

编辑 - 可能甚至不需要并发队列,因为它们之间的显式锁定。任何旧队列都可以。

于 2012-07-26T11:04:30.677 回答
3
#include <thread>
#include <mutex>
#include <condition_variable>
#include <cassert>

class priority_mutex {
  std::condition_variable cv_;
  std::mutex gate_;
  bool locked_;
  std::thread::id pr_tid_; // priority thread
public:
  priority_mutex() : locked_(false) {}
  ~priority_mutex() { assert(!locked_); }
  priority_mutex(priority_mutex&) = delete;
  priority_mutex operator=(priority_mutex&) = delete;

  void lock(bool privileged = false) {
    const std::thread::id tid = std::this_thread::get_id();
    std::unique_lock<decltype(gate_)> lk(gate_);
    if (privileged)
      pr_tid_ = tid;
    cv_.wait(lk, [&]{
      return !locked_ && (pr_tid_ == std::thread::id() || pr_tid_ == tid);
    });
    locked_ = true;
  }

  void unlock() {
    std::lock_guard<decltype(gate_)> lk(gate_);
    if (pr_tid_ == std::this_thread::get_id())
      pr_tid_ = std::thread::id();
    locked_ = false;
    cv_.notify_all();
  }
};

注意:priority_mutex提供了不公平的线程调度。如果特权线程频繁获取锁,其他非特权线程可能几乎不会被调度。

使用示例:

#include <mutex>
priority_mutex mtx;

void privileged_thread()
{
  //...
  {
    mtx.lock(true);  // acquire 'priority lock'
    std::unique_lock<decltype(mtx)> lk(mtx, std::adopt_lock);
    // update shared state, etc.
  }
  //...
}

void normal_thread()
{
  //...
  {
    std::unique_lock<decltype(mtx)> lk(mtx);  // acquire 'normal lock'
    // do something
  }
  //...
}
于 2012-08-03T02:00:06.523 回答
2

pthreads 具有线程优先级:

pthread_setschedprio( (pthread_t*)(&mThreadId), wpri );

如果多个线程在一个锁中休眠等待,调度程序将首先唤醒最高优先级的线程。

于 2012-07-26T14:35:39.063 回答
2

在 linux 上,您可以查看这个人:pthread_setschedparam 和 man sched_setscheduler

pthread_setschedparam(pthread_t thread, int policy, const struct sched_pa​​ram *param);

也检查一下 c++2011:http: //msdn.microsoft.com/en-us/library/system.threading.thread.priority.aspx#Y78

于 2012-07-26T09:48:04.153 回答
1

尝试类似以下的操作。你可以让这个类成为一个线程安全的单例,你甚至可以让它成为一个仿函数。

#include <pthread.h>
#include <semaphore.h>
#include <map>

class ThreadPrioFun
{
    typedef std::multimap<int, sem_t*> priomap_t;
public:
    ThreadPrioFun()
    {
        pthread_mutex_init(&mtx, NULL);
    }
    ~ThreadPrioFun()
    {
        pthread_mutex_destroy(&mtx);
    }
    void fun(int prio, sem_t* pSem)
    {
        pthread_mutex_lock(&mtx);
        bool bWait = !(pm.empty());
        priomap_t::iterator it = pm.insert(std::pair<int, sem_t*>(prio, pSem) );
        pthread_mutex_unlock(&mtx);

        if( bWait ) sem_wait(pSem);

        // do the actual job
        // ....
        //

        pthread_mutex_lock(&mtx);
        // done, remove yourself
        pm.erase(it);
        if( ! pm.empty() )
        {
             // let next guy run:
            sem_post((pm.begin()->second));
        }
        pthread_mutex_unlock(&mtx);
    }
private:
    pthread_mutex_t mtx;
    priomap_t pm;
};
于 2012-07-26T14:21:23.717 回答
0

稍微修改了ecatmur答案,添加了第四个互斥锁来同时处理多个高优先级线程(请注意,这在我的原始问题中不是必需的):

#include <thread>
#include <iostream>
#include "unistd.h"

std::mutex M; //data access mutex
std::mutex N; // 'next to access' mutex
std::mutex L; //low priority access mutex
std::mutex H; //hptwaiting int access mutex

int hptwaiting=0;

void lowpriolock(){
  L.lock();
  while(hptwaiting>0){
    N.lock();
    N.unlock();
  }
  N.lock();
  M.lock();
  N.unlock();
}

void lowpriounlock(){
  M.unlock();
  L.unlock();
}

void highpriolock(){
  H.lock();
  hptwaiting++;
  H.unlock();
  N.lock();
  M.lock();
  N.unlock();
}

void highpriounlock(){
  M.unlock();
  H.lock();
  hptwaiting--;
  H.unlock();
}

void hpt(const char* s){
  using namespace std;
  //cout << "hpt trying to get lock here" << endl;
  highpriolock();
  cout << s << endl;
  usleep(30000);
  highpriounlock();
}

void lpt(const char* s){
  using namespace std;
  //cout << "lpt trying to get lock here" << endl;
  lowpriolock();
  cout << s << endl;
  usleep(30000);
  lowpriounlock();
}

int main(){
std::thread t0(lpt,"low  prio t0  working here");
std::thread t1(lpt,"low  prio t1  working here");
std::thread t2(hpt,"high prio t2  working here");
std::thread t3(lpt,"low  prio t3  working here");
std::thread t4(lpt,"low  prio t4  working here");
std::thread t5(lpt,"low  prio t5  working here");
std::thread t6(hpt,"high prio t6  working here");
std::thread t7(lpt,"low  prio t7  working here");
std::thread t8(hpt,"high prio t8  working here");
std::thread t9(lpt,"low  prio t9  working here");
std::thread t10(lpt,"low  prio t10 working here");
std::thread t11(lpt,"low  prio t11 working here");
std::thread t12(hpt,"high prio t12 working here");
std::thread t13(lpt,"low  prio t13 working here");
//std::cout << "All threads created" << std::endl;
t0.join();
t1.join();
t2.join();
t3.join();
t4.join();
t5.join();
t6.join();
t7.join();
t8.join();
t9.join();
t10.join();
t11.join();
t12.join();
t13.join();
return 0;
}

你怎么看?可以吗?确实,信号量可以更好地处理这种事情,但对我来说互斥锁更容易管理。

于 2012-07-27T00:59:21.680 回答
0

由于线程优先级不适合您:

创建 2 个互斥锁,一个常规锁和一个优先级锁。

普通线程必须先锁普通锁,然后再锁优先级锁。优先级线程只需锁定优先级锁:

Mutex mLock;
Mutex mPriLock;


doNormal()
{
   mLock.lock();
   pthread_yield();
   doPriority();
   mLock.unlock();
}

doPriority()
{
   mPriLock.lock();
   doStuff();
   mPriLock.unlock();
}
于 2012-07-26T15:11:33.350 回答