6

在一个简单的 VS2012 控制台应用程序中,我无法让代码可靠地工作,该应用程序由使用 C++11 条件变量的生产者和消费者组成。我的目标是生成一个小的可靠程序(用作更复杂程序的基础),它使用 3 个参数 wait_for 方法或我在这些网站上收集的代码中的 wait_until 方法:

条件变量: wait_forwait_until

我想将 3 个参数 wait_for 与如下所示的谓词一起使用,但它需要使用类成员变量才能在以后对我最有用。仅运行大约一分钟后,我收到“访问冲突写入位置 0x_ _ ”或“无效参数已传递给服务或函数”作为错误。

stable_clock 和 2 参数 wait_until 是否足以替换 3 参数 wait_for?我也试过这个没有成功。

有人可以展示如何让下面的代码无限期地运行而没有错误或奇怪的行为,无论是从夏令时更改挂钟时间还是互联网时间同步?

指向可靠示例代码的链接可能同样有帮助。

// ConditionVariable.cpp : Defines the entry point for the console application.
//

#include "stdafx.h"

#include <condition_variable>
#include <mutex>
#include <thread>
#include <iostream>
#include <queue>
#include <chrono>
#include <atomic>

#define TEST1

std::atomic<int> 
//int 
    qcount = 0; //= ATOMIC_VAR_INIT(0);

int _tmain(int argc, _TCHAR* argv[])
{
    std::queue<int> produced_nums;
    std::mutex m;
    std::condition_variable cond_var;
    bool notified = false;
    unsigned int count = 0;

    std::thread producer([&]() {
        int i = 0;
        while (1) {
            std::this_thread::sleep_for(std::chrono::microseconds(1500));
            std::unique_lock<std::mutex> lock(m);
            produced_nums.push(i);
            notified = true;
            qcount = produced_nums.size();
            cond_var.notify_one();
            i++;
        }   
        cond_var.notify_one();
    }); 

    std::thread consumer([&]() {
        std::unique_lock<std::mutex> lock(m);
        while (1) {
#ifdef TEST1
            // Version 1
            if (cond_var.wait_for(
                lock,
                std::chrono::microseconds(1000),
                [&]()->bool { return qcount != 0; }))
            {
                if ((count++ % 1000) == 0)
                    std::cout << "consuming " << produced_nums.front    () << '\n';
                produced_nums.pop();
                qcount = produced_nums.size();
                notified = false;
            }
#else
            // Version 2
            std::chrono::steady_clock::time_point timeout1 =
                std::chrono::steady_clock::now() +
                //std::chrono::system_clock::now() +
                std::chrono::milliseconds(1);

            while (qcount == 0)//(!notified)
            {
                if (cond_var.wait_until(lock, timeout1) == std::cv_status::timeout)
                    break;
            }

            if (qcount > 0)
            {
                if ((count++ % 1000) == 0)
                std::cout << "consuming " << produced_nums.front() << '\n';
                produced_nums.pop();
                qcount = produced_nums.size();
                notified = false;
            }
#endif
        }
    });

    while (1);
    return 0;
}

Visual Studio Desktop Express 安装了 1 个重要更新,而 Windows Update 没有其他重要更新。我正在使用 Windows 7 32 位。

4

2 回答 2

5

可悲的是,这实际上是 VS2012 的 condition_variable 实现中的一个错误,并且不会修补此修复程序。您必须在 VS2013 发布时升级到它。

看:

http://connect.microsoft.com/VisualStudio/feedback/details/762560

于 2013-09-24T02:39:58.870 回答
0

首先,在使用condition_variables 时,我个人更喜欢AutoResetEventC# 中的一些包装类:

struct AutoResetEvent
{
    typedef std::unique_lock<std::mutex> Lock;

    AutoResetEvent(bool state = false) :
        state(state)
    { }

    void Set()
    {
        auto lock = AcquireLock();
        state = true;
        variable.notify_one();
    }

    void Reset()
    {
        auto lock = AcquireLock();
        state = false;
    }

    void Wait(Lock& lock)
    {
        variable.wait(lock, [this] () { return this->state; });
        state = false;
    }

    void Wait()
    {
        auto lock = AcquireLock();
        Wait(lock);
    }

    Lock AcquireLock()
    {
        return Lock(mutex);
    }
private:

    bool state;
    std::condition_variable variable;
    std::mutex mutex;
};

这可能与 C# 类型的行为不同,或者可能没有应有的效率,但它可以为我完成任务。

其次,当我需要实现生产/消费习惯时,我会尝试使用并发队列实现(例如tbb queue)或为自己编写一个。但是您还应该考虑使用Active Object Pattern来解决问题。但是对于简单的解决方案,我们可以使用这个:

template<typename T>
struct ProductionQueue
{
    ProductionQueue()
    { }

    void Enqueue(const T& value)
    {
        {
            auto lock = event.AcquireLock();
            q.push(value);
        }
        event.Set();
    }

    std::size_t GetCount()
    {
        auto lock = event.AcquireLock();

        return q.size();
    }

    T Dequeue()
    {
        auto lock = event.AcquireLock();
        event.Wait(lock);

        T value = q.front();
        q.pop();

        return value;
    }

private:
    AutoResetEvent event;
    std::queue<T> q;
};

这个类有一些异常安全问题并且错过了方法的常量性,但就像我说的,对于一个简单的解决方案,这应该适合。

因此,您修改后的代码如下所示:

int main(int argc, char* argv[])
{
    ProductionQueue<int> produced_nums;
    unsigned int count = 0;

    std::thread producer([&]() {
        int i = 0;
        while (1) {
            std::this_thread::sleep_for(std::chrono::microseconds(1500));
            produced_nums.Enqueue(i);
            qcount = produced_nums.GetCount();
            i++;
        }
    }); 

    std::thread consumer([&]() {
        while (1) {
            int item = produced_nums.Dequeue();
            {
                if ((count++ % 1000) == 0)
                    std::cout << "consuming " << item << '\n';
                qcount = produced_nums.GetCount();
            }
        }
    });

    producer.join();
    consumer.join();

    return 0;
}
于 2013-01-12T23:46:44.903 回答