0

我的程序生产者线程从文本文件中读取文本行(大约有 8000 行文本)并将这些行加载到并发队列中。

三个消费者线程从队列中读取行,每个线程写入一个单独的文件。

当我运行程序时,只有生产者线程和消费者线程之一完成。其他两个线程似乎挂起。

我如何可靠地告诉所有消费者线程已到达文件末尾,因此它们应该返回但确保队列完全为空。

我的平台是 Windows 7 64 位

VC11。

编译为 64 位和 32 位的代码具有相同的行为。

这是代码。(它是自包含和可编译的)

#include <queue>
#include<iostream>
#include<fstream>
#include <atomic>
#include <thread>
#include <condition_variable>
#include <mutex>
#include<string>
#include<memory>


template<typename Data>
class concurrent_queue
{
private:
    std::queue<Data> the_queue;
    mutable std::mutex the_mutex;
    std::condition_variable the_condition_variable;
public:
    void push(Data const& data){
        {
            std::lock_guard<std::mutex> lock(the_mutex);
            the_queue.push(data);
        }
        the_condition_variable.notify_one();
    }

    bool empty() const{
        std::unique_lock<std::mutex> lock(the_mutex);
        return the_queue.empty();
    }

    const size_t size() const{
        std::lock_guard<std::mutex> lock(the_mutex);
        return the_queue.size();
    }

    bool try_pop(Data& popped_value){
        std::unique_lock<std::mutex> lock(the_mutex);
        if(the_queue.empty()){
            return false;
        }
        popped_value=the_queue.front();
        the_queue.pop();
        return true;
    }

    void wait_and_pop(Data& popped_value){
        std::unique_lock<std::mutex> lock(the_mutex);
        while(the_queue.empty()){
            the_condition_variable.wait(lock);
        }
        popped_value=the_queue.front();
        the_queue.pop();
    }
};

std::atomic<bool> done(true);
typedef std::vector<std::string> segment;
concurrent_queue<segment> data;
const int one_block = 15;

void producer()
{
    done.store(false);
    std::ifstream inFile("c:/sample.txt");
    if(!inFile.is_open()){
        std::cout << "Can't read from file\n";
        return;
    }

    std::string line;
    segment seg;
    int cnt = 0;
    while(std::getline(inFile,line)){
        seg.push_back(line);
        ++cnt;
        if( cnt == one_block ){
            data.push( seg );
            seg.clear();
            cnt = 0;
        }
    }
    inFile.close();
    done.store(true);
    std::cout << "all done\n";
}

void consumer( std::string fname)
{
    std::ofstream outFile(fname.c_str());
    if(!outFile.is_open()){
        std::cout << "Can't write to file\n";
        return;
    }

    do{
        while(!data.empty()){
            segment seg;
            data.wait_and_pop( seg );
            for(size_t i = 0; i < seg.size(); ++i)
            {
                outFile << seg[i] << std::endl;
            }
            outFile.flush();
        }
    } while(!done.load());
    outFile.close();
    std::cout << fname << "  done.\n";
}

int main()
{
    std::thread th0(producer);
    std::thread th1(consumer, "Worker1.txt");
    std::thread th2(consumer, "Worker2.txt");
    std::thread th3(consumer, "Worker3.txt");

    th0.join();
    th1.join();
    th2.join();
    th3.join();

    return 0;
}
4

3 回答 3

1

我用来终止在队列中等待的所有线程的方法是在队列上设置一个标志,说明它是否已完成,在检查pop()函数中是否存在元素之前对其进行测试。如果标志指示程序应该停止,pop()则如果队列中没有元素,则任何线程调用都会引发异常。当标志改变时,改变线程只调用notify_all()相应的条件变量。

于 2012-12-29T20:54:25.763 回答
0

看下面的代码:

while(!data.empty()){
    segment seg;
    data.wait_and_pop( seg );
    ...

考虑要读取最后一段数据的情况。消费者正在等待数据被读取th1th2

消费者th1检查!data.empty()并发现有要读取的数据。然后在th1调用之前data.wait_and_pop(),消费者th2检查!data.empty()并发现它是真的。假设th1消费者消费了最后一段。现在,由于没有要读取的段,因此th2无限期地等待the_queue.empty()in data.wait_and_pop()

试试这个代码而不是上面的代码片段:

segment seg;
while(data.try_pop(seg)){
    ...

应该让它工作。

于 2012-12-29T20:53:12.523 回答
0

您可能想在concurrent_queue. 读取文件后设置它(在互斥锁下)。读取文件队列为空后,使用 . 从清空队列的消费者那里广播条件变量notify_all

这将唤醒所有其他需要发现最终条件(标志集和队列为空)并退出循环的消费者。为了避免竞争条件,这意味着他们需要在等待之前检查相同的组合条件。

您现有标志的问题是永远不会从等待 condvar 中醒来的线程,永远不会检查它。“完成”标志需要是他们正在等待的状态的一部分。

[编辑:Dietmar 对标志的微妙不同含义可能会导致代码更简单,但我还没有将它们都写出来进行比较。]

于 2012-12-29T20:54:48.093 回答