我有一个用例,其中快速生产者将数据插入队列,而慢速消费者使用数据。我面临的问题是队列大小随着时间的推移不断增加。我有一个类实现,其中 std::queue 受互斥锁和条件变量保护,用于并发读写。
这如何适应生产者在达到 MAX_THRESHOLD 直到停止将数据插入队列并且消费者已经消耗了一定量的数据信号生产者将数据插入队列的情况。
有人可以提供一个示例实现吗?
在不改变类实现的情况下,是否可以通过在生产者和消费者中添加另一层同步来解决这个问题?
我有一个用例,其中快速生产者将数据插入队列,而慢速消费者使用数据。我面临的问题是队列大小随着时间的推移不断增加。我有一个类实现,其中 std::queue 受互斥锁和条件变量保护,用于并发读写。
这如何适应生产者在达到 MAX_THRESHOLD 直到停止将数据插入队列并且消费者已经消耗了一定量的数据信号生产者将数据插入队列的情况。
有人可以提供一个示例实现吗?
在不改变类实现的情况下,是否可以通过在生产者和消费者中添加另一层同步来解决这个问题?
任何一个:
a) 如果队列大小达到 MAX_THRESHOLD,则使用有界队列类阻止生产者。这意味着更改您可能不想要的队列类。
b) 使用“池队列”——另一个无界阻塞队列,在启动时用 MAX_THRESHOLD 个对象填充。生产者从池中获取其对象,加载它们,排队到生产者。生产者从消费者那里获取对象,“消费”它们并将它们返回到池中。这有点要求使用您可能不想要的指针或引用。
c) 使用以 MAX_THRESHOLD 计数初始化的信号量以与 (b) 类似的方式表示消息令牌 - 生产者必须在排队之前获取一个单元,消费者在完成消息对象后发布一个单元。
我倾向于使用(b)。
带有 pthread 的“有界队列”的代码片段:
#include <queue>
#include <pthread.h>
template <class T, size_t UpperLimit>
class BoundedQueue {
std::queue<T> q_;
pthread_mutex_t mtx_;
pthread_cond_t cv_not_empry_;
pthread_cond_t cv_not_full_;
// lock/unlock helper
struct auto_locker {
auto_locker(pthread_mutex_t* pm) : pm_(pm)
{ pthread_mutex_lock(pm_); }
~auto_locker()
{ pthread_mutex_unlock(pm_);}
pthread_mutex_t *pm_;
};
public:
BoundedQueue() { /* initialize member... */ }
~BoundedQueue() { /* uninitialize member...*/ }
// for Producer
void push(T x) {
auto_locker lk(&mtx_);
while (UpperLimit <= q_.size()) {
pthread_cond_wait(&cv_not_full_, &mtx_);
}
q_.push(x);
pthread_cond_broadcast(&cv_not_empry_);
return ret;
}
// for Consumer
T pop() {
auto_locker lk(&mtx_);
while (q_.empty()) {
pthread_cond_wait(&cv_not_empry_, &mtx_);
}
T ret = q_.front();
q_.pop();
pthread_cond_broadcast(&cv_not_full_);
return ret;
}
}