3

我正在尝试为我正在处理的项目在 C++ 中实现生产者/消费者模型多线程程序。基本思想是主线程创建第二个线程来监视串行端口的新数据,处理数据并将结果放入由主线程定期轮询的缓冲区中。我以前从未编写过多线程程序。我一直在阅读很多教程,但它们都是用 C 语言编写的。我想我已经掌握了基本概念,但我正在尝试对其进行 c++ 化。对于缓冲区,我想创建一个内置互斥保护的数据类。这就是我想出的。

1)我是不是走错了路?有没有更聪明的方法来实现受保护的数据类?

2) 如果两个线程同时尝试调用,下面的代码会发生什么ProtectedBuffer::add_back()

#include <deque>
#include "pthread.h"

template <class T>
class ProtectedBuffer {
  std::deque<T> buffer;
  pthread_mutex_t mutex;
public:
  void add_back(T data) {
    pthread_mutex_lock(&mutex);
    buffer.push_back(data);
    pthread_mutex_unlock(&mutex);
  }
  void get_front(T &data) {
    pthread_mutex_lock(&mutex);
    data = buffer.front();
    buffer.pop_front();
    pthread_mutex_unlock(&mutex);
  }
};

编辑:感谢所有伟大的建议。我试图在下面实现它们。我还添加了一些错误检查,所以如果一个线程以某种方式设法尝试锁定同一个互斥锁两次,它将优雅地失败。我认为。

#include "pthread.h"
#include <deque>


class Lock {
    pthread_mutex_t &m;
    bool locked;
    int error;
public:
    explicit Lock(pthread_mutex_t & _m) : m(_m) {
        error = pthread_mutex_lock(&m);
        if (error == 0) {
            locked = true;
        } else {
            locked = false;
        }
    }
    ~Lock() {
        if (locked)
            pthread_mutex_unlock(&m);
    }
    bool is_locked() {
        return locked;
    }
};

class TryToLock {
    pthread_mutex_t &m;
    bool locked;
    int error;
public:
    explicit TryToLock(pthread_mutex_t & _m) : m(_m) {
        error = pthread_mutex_trylock(&m);
        if (error == 0) {
            locked = true;
        } else {
            locked = false;
        }
    }
    ~TryToLock() {
        if (locked)
            pthread_mutex_unlock(&m);
    }
    bool is_locked() {
        return locked;
    }
};

template <class T>
class ProtectedBuffer{
    pthread_mutex_t mutex;
    pthread_mutexattr_t mattr;
    std::deque<T> buffer;
    bool failbit;

    ProtectedBuffer(const ProtectedBuffer& x);
    ProtectedBuffer& operator= (const ProtectedBuffer& x);
public:
    ProtectedBuffer() {
        pthread_mutexattr_init(&mattr);
        pthread_mutexattr_settype(&mattr, PTHREAD_MUTEX_ERRORCHECK);
        pthread_mutex_init(&mutex, &mattr);
        failbit = false;
    }
    ~ProtectedBuffer() {
        pthread_mutex_destroy(&mutex);
        pthread_mutexattr_destroy(&mattr);
    }
    void add_back(T &data) {
        Lock lck(mutex);
        if (!lck.locked()) {
            failbit = true;
            return;
        }
        buffer.push_back(data);
        failbit = false;
    }
    void get_front(T &data) {
        Lock lck(mutex);
        if (!lck.locked()) {
            failbit = true;
            return;
        }
        if (buffer.empty()) {
            failbit = true;
            return;
        }
        data = buffer.front();
        buffer.pop_front();
        failbit = false;
    }
    void try_get_front(T &data) {
        TryToLock lck(mutex);
        if (!lck.locked()) {
            failbit = true;
            return;
        }
        if (buffer.empty()) {
            failbit = true;
            return;
        }
        data = buffer.front();
        buffer.pop_front();
        failbit = false;
    }
    void try_add_back(T &data) {
        TryToLock lck(mutex);
        if (!lck.locked()) {
            failbit = true;
            return;
        }
        buffer.push_back(data);
        failbit = false;
    }
};
4

4 回答 4

9

几件事:

  • 您需要在构造函数中初始化mutex并在析构函数中pthread_mutex_init释放它。pthread_mutex_destroy

  • 您必须使您的类不可复制和不可分配(或以其他方式正确实现复制构造函数和赋值运算符;见上文)。

  • 为锁创建一个 SBRM 助手类是值得的:

    class Lock
    {
        pthread_mutex_t & m;
    public:
        explicit Lock(pthread_mutex_t & _m) : m(_m) { pthread_mutex_lock(&m); }
        ~Lock() { pthread_mutex_unlock(&m); }
    };
    

    现在您可以制作一个同步范围,例如{ Lock lk(mutex); /* ... */ }.

至于问题2:并发访问是通过锁定互斥锁的方式进行序列化的。竞争线程之一将在获取互斥锁时休眠。

于 2012-07-24T23:27:39.890 回答
1

我会以错误的方式解决这个问题吗?有没有更聪明的方法来实现受保护的数据类?

对于您的实施,我认为您有一个良好的开端。既然您询问了 C++ifying,那么如果您有一个支持 C++11 的编译器,则可以使用新的线程支持。

您提到您希望主线程轮询此缓冲区,但我没有看到任何允许它这样做的机制。当缓冲区中没有任何get_front内容时应该提供错误,或者get_buffer应该阻止调用者直到数据可用。

#include <deque>
#include <mutex>
#include <condition_variable>
#include <stdexcept>

template <class T>
class ProtectedBuffer {
  std::deque<T> buffer;
  std::mutex mtx;
  std::condition_variable empty_cnd;
  void get_front_i(T &data) {
    data = buffer.front();
    buffer.pop_front();
  }
public:
  void add_back(T data) {
    std::lock_guard<std::mutex> g(mtx);
    bool was_empty = buffer.empty();
    buffer.push_back(data);
    if (was_empty) empty_cnd.notify_one();
  }
  void get_front_check(T &data) {
    std::lock_guard<std::mutex> g(mtx);
    if (buffer.empty()) throw std::underflow_error("no data");
    get_front_i(data);
  }
  void get_front_block(T &data) {
    std::lock_guard<std::mutex> g(mtx);
    std::unique_lock<std::mutex> u(mtx);
    while (buffer.empty()) empty_cnd.wait(u);
    get_front_i(data);
    if (!buffer.empty()) empty_cnd.notify_one();
  }
};

如果您想限制添加到缓冲区的数据量,您可以添加一个类似的full_cnd条件变量来检查add_back调用将等待的完整条件(如果为真)。然后,该get_front_i方法可以在缓冲区不再满时发出信号。

如果两个线程同时尝试调用 ProtectedBuffer::add_back(),下面的代码会发生什么?

由于add_back受到互斥保护,如果两个线程同时调用它,一个线程将被阻止调用push_back,直到另一个线程完成。

于 2012-07-25T00:15:35.213 回答
0

你有基础知识,但我会更进一步,将互斥锁本身包装在它自己的 RAII 包装器中,例如:

#include <deque> 
#include "pthread.h" 

class ProtectedMutex
{
  pthread_mutex_t &mutex; 
public:
  ProtectedMutex(pthread_mutex_t &m)
    : mutex(m); 
  {
    pthread_mutex_lock(&mutex); 
  }
  ~ProtectedMutex()
  {
    pthread_mutex_unlock(&mutex); 
  }
};

template <class T> 
class ProtectedBuffer { 
  std::deque<T> buffer; 
  pthread_mutex_t mutex; 
public: 
  void add_back(T data) { 
    ProtectedMutex m(mutex); 
    buffer.push_back(data); 
  } 
  void get_front(T &data) { 
    ProtectedMutex m(mutex); 
    data = buffer.front(); 
    buffer.pop_front(); 
  } 
}; 
于 2012-07-24T23:32:56.620 回答
0

' 将结果放入由主线程定期轮询的缓冲区中' - CPU 浪费和延迟。

“我是不是走错路了?” - 是的。我不知道您的系统对辅助线程<> GUI 线程通信有什么样的支持,但总有 PostMessage() API。

当然,您需要一个 Buffer 类,它具有串行 rx 数据的数据成员和执行协议/“处理数据”的方法。你不需要太多其他东西。在您的第二个线程中,创建一个缓冲区类实例。加载它,处理数据和 PostMessage/dispatch/BeginInvoke 它指向您的 GUI 线程的指针。在串行线程的下一行代码中,在同一个实例指针 var 中创建另一个实例,以便从串行端口加载下一次数据。在 GUI 中显示/记录/任何内容之后,GUI 线程应该删除()它收到的 *Buffer。

没有延迟,没有 CPU 浪费,没有数据复制,串行线程和 GUI 线程不可能在同一个缓冲区实例上工作,没有讨厌、复杂的缓冲区共享代码,没有锁,没有麻烦。它会运作良好。

其他任何事情都会变得混乱。

编辑-忘记了(2)-不知道,。不会用驳船杆碰它..

于 2012-07-24T23:42:52.357 回答