我的程序生产者线程从文本文件中读取文本行(大约有 8000 行文本)并将这些行加载到并发队列中。
三个消费者线程从队列中读取行,每个线程写入一个单独的文件。
当我运行程序时,只有生产者线程和消费者线程之一完成。其他两个线程似乎挂起。
我如何可靠地告诉所有消费者线程已到达文件末尾,因此它们应该返回但确保队列完全为空。
我的平台是 Windows 7 64 位
VC11。
编译为 64 位和 32 位的代码具有相同的行为。
这是代码。(它是自包含和可编译的)
#include <queue>
#include<iostream>
#include<fstream>
#include <atomic>
#include <thread>
#include <condition_variable>
#include <mutex>
#include<string>
#include<memory>
template<typename Data>
class concurrent_queue
{
private:
std::queue<Data> the_queue;
mutable std::mutex the_mutex;
std::condition_variable the_condition_variable;
public:
void push(Data const& data){
{
std::lock_guard<std::mutex> lock(the_mutex);
the_queue.push(data);
}
the_condition_variable.notify_one();
}
bool empty() const{
std::unique_lock<std::mutex> lock(the_mutex);
return the_queue.empty();
}
const size_t size() const{
std::lock_guard<std::mutex> lock(the_mutex);
return the_queue.size();
}
bool try_pop(Data& popped_value){
std::unique_lock<std::mutex> lock(the_mutex);
if(the_queue.empty()){
return false;
}
popped_value=the_queue.front();
the_queue.pop();
return true;
}
void wait_and_pop(Data& popped_value){
std::unique_lock<std::mutex> lock(the_mutex);
while(the_queue.empty()){
the_condition_variable.wait(lock);
}
popped_value=the_queue.front();
the_queue.pop();
}
};
std::atomic<bool> done(true);
typedef std::vector<std::string> segment;
concurrent_queue<segment> data;
const int one_block = 15;
void producer()
{
done.store(false);
std::ifstream inFile("c:/sample.txt");
if(!inFile.is_open()){
std::cout << "Can't read from file\n";
return;
}
std::string line;
segment seg;
int cnt = 0;
while(std::getline(inFile,line)){
seg.push_back(line);
++cnt;
if( cnt == one_block ){
data.push( seg );
seg.clear();
cnt = 0;
}
}
inFile.close();
done.store(true);
std::cout << "all done\n";
}
void consumer( std::string fname)
{
std::ofstream outFile(fname.c_str());
if(!outFile.is_open()){
std::cout << "Can't write to file\n";
return;
}
do{
while(!data.empty()){
segment seg;
data.wait_and_pop( seg );
for(size_t i = 0; i < seg.size(); ++i)
{
outFile << seg[i] << std::endl;
}
outFile.flush();
}
} while(!done.load());
outFile.close();
std::cout << fname << " done.\n";
}
int main()
{
std::thread th0(producer);
std::thread th1(consumer, "Worker1.txt");
std::thread th2(consumer, "Worker2.txt");
std::thread th3(consumer, "Worker3.txt");
th0.join();
th1.join();
th2.join();
th3.join();
return 0;
}