我尝试使用 Intel TBB 使用文件读取、排序、文件写入阶段编写双音排序的流水线版本,如下所示。代码在 while(!outQueue.try_pop(line)); 处冻结在自旋锁处;在 FileWriter 过滤器中。有人可以解释为什么会这样吗?
更新:我做了一些进一步的测试,发现头文件 _concurrent_queue_internal.h 中 try_pop 调用的 internal_try_pop 有一个 compare_and_swap 操作,对于这个特定的 try_pop,它永远失败。以下是我从 internal_try_pop 中提取的值
head counter(k)15
tail counter1605177747
item ticket(tk)15
k after head CAS 15
(k=tk)15,15---break!!
我认为尾部计数器值是垃圾。对于这种情况,我能想到的唯一原因是排序器添加到队列的值可能会被它隐式修改,从而使其不可用。
有任何想法吗?
谢谢 :)
#include <iostream>
#include <sstream>
#include <string>
#include <algorithm>
#include <fstream>
#include "tbb\parallel_for.h"
#include "tbb\blocked_range.h"
#include "tbb\pipeline.h"
#include "tbb\concurrent_queue.h"
using namespace tbb;
using namespace std;
// Filter that writes lines to the output file.
class FileWriterFilter: public tbb::filter {
string outPath;
public:
FileWriterFilter(string outPath );
/*override*/void* operator()( void* item );
};
FileWriterFilter::FileWriterFilter( string outPath ) :
tbb::filter(/*is_serial=*/true),
outPath(outPath)
{
}
void* FileWriterFilter::operator()( void* item ) {
concurrent_queue<string>& outQueue = *static_cast<concurrent_queue<string>*>(item);
string line;
while(!outQueue.try_pop(line));
ofstream myfile(outPath);
if (myfile.is_open())
{
myfile <<line<<endl;
}
//myfile.close();
return NULL;
}
class FileReaderFilter: public tbb::filter {
public:
FileReaderFilter(string inPath);
private:
ifstream ifs;
tbb::concurrent_queue<string> queue;
/*override*/ void* operator()(void*);
};
FileReaderFilter::FileReaderFilter(string inPath ) :
filter(/*is_serial=*/true),
ifs(inPath)
{
}
void* FileReaderFilter::operator()(void*) {
string temp;
if( getline( ifs, temp ))
{
queue.push(temp);
}
return &queue;
}
class BitonicSort: public tbb::filter{
public:
BitonicSort();
/*override*/void* operator()( void* item );
size_t *a;
private : static const bool ASCENDING=true, DESCENDING=false;
public :void sort(size_t *b,int n)
{
a=b;
bitonicSort(0,n,ASCENDING);
}
private: void bitonicSort(int lo,int n,bool dir)
{
if(n>1)
{
int m=n/2;
bitonicSort(lo,m,ASCENDING);
bitonicSort(lo+m,m,DESCENDING);
bitonicMerge(lo,n,dir);
}
}
private : void bitonicMerge(int lo,int n,bool dir)
{
if(n>1)
{
int m=n/2;
for(int i=lo;i<lo+m;i++)
{
compare(i,i+m,dir);
}
bitonicMerge(lo,m,dir);
bitonicMerge(lo+m,m,dir);
}
}
private : void compare(int i,int j, bool dir)
{
if(dir==a[i]>a[j])
{
exchange(i,j);
}
}
private : void exchange(int i,int j)
{
/* cout<<a[i]<<" "<<a[j]<<endl;*/
int t=a[i];
a[i]=a[j];
a[j]=t;
/*cout<<a[i]<<" "<<a[j]<<endl<<endl;*/
}
private :string convertInt(int number)
{
stringstream ss;//create a stringstream
ss << number;//add number to the stream
return ss.str();//return a string with the contents of the stream
}
};
BitonicSort::BitonicSort() :
tbb::filter(/*serial=*/false)
{}
/*override*/void* BitonicSort::operator()( void* item ) {
int num_elem=2048;
size_t *max = new size_t[num_elem];
concurrent_queue<string>& queue = *static_cast<concurrent_queue<string>*>(item);
concurrent_queue<string> outQueue;
string line;
while(!queue.try_pop(line));
istringstream iss(line);
int i=0;
do
{
string sub;
iss >> sub;
max[i]=atoi(sub.c_str());;
i++;
} while (iss);
sort(max,num_elem);
string out;
for(int i=0;i<num_elem;i++)
{
out.append(convertInt(max[i]).append(" "));
}
outQueue.push(out);
return &outQueue;
}
int main() {
tbb::pipeline pipeline;
FileReaderFilter reader("sample.txt");
pipeline.add_filter(reader);
BitonicSort sorter;
pipeline.add_filter(sorter);
FileWriterFilter writer("test.txt");
pipeline.add_filter(writer);
pipeline.run(3);
pipeline.clear();
system("PAUSE");
}