45

我正在将一些 Java 代码移植到 C++ 中,其中一个特定部分使用 BlockingQueue 将消息从许多生产者传递给单个消费者。

如果您不熟悉 Java BlockingQueue 是什么,它只是一个具有硬容量的队列,它向队列中的 put() 和 take() 公开线程安全方法。如果队列已满,put() 阻塞,如果队列为空,则 take() 阻塞。此外,还提供了这些方法的超时敏感版本。

超时与我的用例相关,因此提供这些的建议是理想的。如果没有,我可以自己编写一些代码。

我用谷歌搜索并快速浏览了 Boost 库,但没有找到类似的东西。也许我在这里是盲人......但有人知道一个好的推荐吗?

谢谢!

4

4 回答 4

58

它不是固定大小,也不支持超时,但这是我最近使用 C++ 2011 构造发布的队列的简单实现:

#include <mutex>
#include <condition_variable>
#include <deque>

template <typename T>
class queue
{
private:
    std::mutex              d_mutex;
    std::condition_variable d_condition;
    std::deque<T>           d_queue;
public:
    void push(T const& value) {
        {
            std::unique_lock<std::mutex> lock(this->d_mutex);
            d_queue.push_front(value);
        }
        this->d_condition.notify_one();
    }
    T pop() {
        std::unique_lock<std::mutex> lock(this->d_mutex);
        this->d_condition.wait(lock, [=]{ return !this->d_queue.empty(); });
        T rc(std::move(this->d_queue.back()));
        this->d_queue.pop_back();
        return rc;
    }
};

扩展和使用定时等待弹出应该是微不足道的。我没有这样做的主要原因是我对到目前为止所想到的界面选择不满意。

于 2012-10-09T17:56:40.910 回答
6

这是具有关闭请求功能的阻塞队列的示例:

template <typename T> class BlockingQueue {
  std::condition_variable _cvCanPop;
  std::mutex _sync;
  std::queue<T> _qu;
  bool _bShutdown = false;

public:
  void Push(const T& item)
  {
    {
      std::unique_lock<std::mutex> lock(_sync);
      _qu.push(item);
    }
    _cvCanPop.notify_one();
  }

  void RequestShutdown() {
    {
      std::unique_lock<std::mutex> lock(_sync);
      _bShutdown = true;
    }
    _cvCanPop.notify_all();
  }

  bool Pop(T &item) {
    std::unique_lock<std::mutex> lock(_sync);
    for (;;) {
      if (_qu.empty()) {
        if (_bShutdown) {
          return false;
        }
      }
      else {
        break;
      }
      _cvCanPop.wait(lock);
    }
    item = std::move(_qu.front());
    _qu.pop();
    return true;
  }
};
于 2018-09-01T12:27:01.353 回答
0

你应该先写信号量的类

#ifndef SEMEPHORE_H
#define SEMEPHORE_H
#include <mutex>
#include <condition_variable>

class semephore {
public:
    semephore(int count = 0)
        : count(count),
          m(),
          cv()
    {

    }

    void await() {
        std::unique_lock<std::mutex> lk(m);
        --count;
        if (count < 0) {
            cv.wait(lk);
        }
    }

    void post() {
        std::unique_lock<std::mutex> lk(m);
        ++count;
        if (count <= 0) {
            cv.notify_all();
        }
    }
    
private:
    int count;
    std::mutex m;
    std::condition_variable cv;
};

#endif // SEMEPHORE_H

现在blocked_queue可以使用信号量来处理它

#ifndef BLOCKED_QUEUE_H
#define BLOCKED_QUEUE_H
#include <list>
#include "semephore.h"

template <typename T>
class blocked_queue {
public:
    blocked_queue(int count) 
        : s_products(),
          s_free_space(count),
          li()
    {

    }

    void put(const T &t) {
        s_free_space.await();
        li.push_back(t);
        s_products.post();
    }

    T take() {
        s_products.await();
        T res = li.front();
        li.pop_front();
        s_free_space.post();
        return res;
    }
private:
    semephore s_products;
    semephore s_free_space;
    std::list<T> li;
};

#endif // BLOCKED_QUEUE_H

于 2021-11-28T07:15:18.207 回答
0

好的,我参加聚会有点晚了,但我认为这更适合 Java 的BlockingQueue实现。在这里我也使用一个互斥锁和两个条件来照顾不满不空。IMO aBlockingQueue在容量有限的情况下更有意义,我在其他答案中没有看到。我也包括一个简单的测试场景:

#include <iostream>
#include <algorithm>
#include <queue>
#include <mutex>
#include <thread>
#include <condition_variable>

template<typename T>
class blocking_queue {
private:
    size_t _capacity;
    std::queue<T> _queue;
    std::mutex _mutex;
    std::condition_variable _not_full;
    std::condition_variable _not_empty;

public:
    inline blocking_queue(size_t capacity) : _capacity(capacity) {
        // empty
    }

    inline size_t size() const {
        std::unique_lock<std::mutex> lock(_mutex);
        return _queue.size();
    }

    inline bool empty() const {
        std::unique_lock<std::mutex> lock(_mutex);
        return _queue.empty();
    }

    inline void push(const T& elem) {
        {
            std::unique_lock<std::mutex> lock(_mutex);

            // wait while the queue is full
            while (_queue.size() >= _capacity) {
                _not_full.wait(lock);
            }
            std::cout << "pushing element " << elem << std::endl;
            _queue.push(elem);
        }
        _not_empty.notify_all();
    }

    inline void pop() {
        {
            std::unique_lock<std::mutex> lock(_mutex);

            // wait while the queue is empty
            while (_queue.size() == 0) {
                _not_empty.wait(lock);
            }
            std::cout << "popping element " << _queue.front() << std::endl;
            _queue.pop();
        }
        _not_full.notify_one();
    }

    inline const T& front() {
        std::unique_lock<std::mutex> lock(_mutex);

        // wait while the queue is empty
        while (_queue.size() == 0) {
            _not_empty.wait(lock);
        }
        return _queue.front();
    }
};

int main() {
    blocking_queue<int> queue(5);

    // create producers
    std::vector<std::thread> producers;
    for (int i = 0; i < 10; i++) {
        producers.push_back(std::thread([&queue, i]() {
            queue.push(i);
            // produces too fast
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
        }));
    }

    // create consumers
    std::vector<std::thread> consumers;
    for (int i = 0; i < 10; i++) {
        producers.push_back(std::thread([&queue, i]() {
            queue.pop();
            // consumes too slowly
            std::this_thread::sleep_for(std::chrono::milliseconds(1000));
        }));
    }

    std::for_each(producers.begin(), producers.end(), [](std::thread &thread) {
        thread.join();
    });

    std::for_each(consumers.begin(), consumers.end(), [](std::thread &thread) {
        thread.join();
    });

    return EXIT_SUCCESS;
}
于 2019-08-27T21:39:39.887 回答