0

当谈到使用 OpenMP 和 TBB 进行共享内存编程时,我是个初学者。

我正在实现 QuickHull 算法的并行版本(http://en.wikipedia.org/wiki/QuickHull)来查找一组数据点的凸包。(http://en.wikipedia.org/wiki/Convex_hull)。

基本上可以并行完成以下任务:

  1. 找到最左边和最右边的点(P 和 Q)。
  2. 根据这两个点(P 和 Q)的线连接拆分整个数据集。
  3. 对于这两组中的每一个,获取距离最后一次拆分发生的线 (PQ) 最远的点。
  4. 根据最远点 (C) 将数据分成两组:一组包含 PC 线右侧的所有元素,另一组包含 QC 线右侧的所有元素。

请注意,第 3 部分和第 4 部分是递归完成的,直到每个子集都为空。

首先,我使用 OpenMP 完成了这项工作,主要使用#pragma omp parallel for.... 但我个人认为我做错了什么,因为加速从未超过 2 倍。其次,我还使用 Intel TBB 进行了实现以比较加速,这导致了负加速(即使对于大型数据集)。使用 TBB,我同时使用了 tbb::parallel_for() 和 tbb::parallel_reduce()。

基本上我的问题可以分为两部分:1)OpenMP 实现 2)TBB 实现

第1部分

从下面的基准测试中可以看出,当我的数据集大小增加时,使用足够多的线程也会加快速度。

最多 10,000 个元素的小型集合的运行时 从 100,000 个元素开始的较大集合的运行时 加快比较 OpenMP 和顺序算法

请注意,加速并没有超过 2 倍,我个人认为这对这个算法来说是非常糟糕的,因为很大一部分是可并行的。以下是相关代码:

     void ompfindHull(POINT_VECTOR &input, Point* p1, Point* p2, POINT_VECTOR& output){
        // If there are no points in the set... just stop. This is the stopping criteria for the recursion. :)
        if (input.empty() || input.size() == 0) return;

        int num_threads = omp_get_max_threads();
        // Get the point that is the farthest from the p1-p2 segment
        Point** farthest_sub = new Point*[num_threads];
        double** distance_sub = new double*[num_threads];
        int thread_id;
        #pragma omp parallel private (thread_id)
        {
            thread_id = omp_get_thread_num();
            farthest_sub[thread_id] = input[0];
            distance_sub[thread_id] = new double(0);

            #pragma omp for
                for (int index = 1; index < input.size(); index++){
                    Point*a = p1;
                    Point*b = p2;
                    Point*c = input[index];

                    double distance = ( ( b->x - a->x ) * ( a->y - c->y ) ) - ( ( b->y - a->y ) * ( a->x - c->x ) );
                    distance = distance >= 0 ? distance : -distance;

                    double cur_distance = *distance_sub[thread_id];
                    if (cur_distance < distance){
                        farthest_sub[thread_id] = input[index];
                        distance_sub[thread_id] = new double(distance);
                    }
                }
        }

        Point* farthestPoint = farthest_sub[0];
        int distance = *distance_sub[0];
        for (int index = 1; index < num_threads; index++){
            if (distance < *distance_sub[index]){
                farthestPoint = farthest_sub[index];
            }
        }

        delete [] farthest_sub;
        delete [] distance_sub;

        // Add the farthest point to the output as it is part of the convex hull.
        output.push_back(farthestPoint);

        // Split in two sets.
        // The first one contains points right from p1 - farthestPoint
        // The second one contains points right from farthestPoint - p2
        vector<POINT_VECTOR> left_sub(num_threads), right_sub(num_threads);
        #pragma omp parallel private(thread_id)
        {
            thread_id = omp_get_thread_num();
            #pragma omp for
            for (size_t index = 0; index < input.size(); index++){
                Point* curPoint = input[index];
                if (curPoint != farthestPoint){
                    if (getPosition(p1, farthestPoint, curPoint) == RIGHT){
                        left_sub[thread_id].push_back(curPoint);
                    } else if (getPosition(farthestPoint, p2, curPoint) == RIGHT){
                        right_sub[thread_id].push_back(curPoint);
                    }
                }
            }
        }

        //Merge all vectors into a single vector :)
        POINT_VECTOR left, right;
        for (int index=0; index < num_threads; index++){
            left.insert(left.end(), left_sub[index].begin(), left_sub[index].end());
            right.insert(right.end(), right_sub[index].begin(), right_sub[index].end());
        }

        input.clear();


        // We do more recursion :)
        ompfindHull(left, p1, farthestPoint, output);
        ompfindHull(right, farthestPoint, p2, output);
     }

     double ompquickHull(POINT_VECTOR input, POINT_VECTOR& output){
        Timer timer;
        timer.start();

        // Find the left- and rightmost point.
        // We get the number of available threads.
        int num_threads = omp_get_max_threads();
        int thread_id;
        POINT_VECTOR minXPoints(num_threads);
        POINT_VECTOR maxXPoints(num_threads);

        // Devide all the points in subsets between several threads. For each of these subsets
        // we need to find the minX and maxX
        #pragma omp parallel shared(minXPoints,maxXPoints, input) private(thread_id)
        {
            thread_id = omp_get_thread_num();
            minXPoints[thread_id] = input[0];
            maxXPoints[thread_id] = input[0];

            int index;
            #pragma omp for
            for (index = 1; index < input.size(); index++)
            {
                Point* curPoint = input[index];
                if (curPoint->x > maxXPoints[thread_id]->x){
                    maxXPoints[thread_id] = curPoint;
                } else if (curPoint->x < minXPoints[thread_id]->x) {
                    minXPoints[thread_id] = curPoint;
                }
            }

            #pragma omp barrier

        }

        // We now have all the minX and maxX points of every single subset. We now use
        // these values to find the overall min and max X-point.
        Point* minXPoint = input[0], *maxXPoint = input[0];
        for (int index = 0; index < num_threads; index++){
            if (minXPoint->x > minXPoints[index]->x){
                minXPoint = minXPoints[index];
            }

            if (maxXPoint->x < maxXPoints[index]->x){
                maxXPoint = maxXPoints[index];
            }
        }

        // These points are sure to be part of the convex hull, so add them
        output.push_back(minXPoint);
        output.push_back(maxXPoint);

        // Now we have to split the set of point in subsets.
        // The first one containing all points above the line
        // The second one containing all points below the line
        const int size = input.size();
        vector<POINT_VECTOR> left_sub(num_threads), right_sub(num_threads);

        #pragma omp parallel private(thread_id)
        {
            thread_id = omp_get_thread_num();
            #pragma omp for
            for (unsigned int index = 0; index < input.size(); index++){
                Point* curPoint = input[index];
                if (curPoint != minXPoint || curPoint != maxXPoint){
                    if (getPosition(minXPoint, maxXPoint, curPoint) == RIGHT){
                        left_sub[thread_id].push_back(curPoint);
                    }
                    else if (getPosition(maxXPoint, minXPoint, curPoint) == RIGHT){
                        right_sub[thread_id].push_back(curPoint);
                    }
                }
            }
        }

        //Merge all vectors into a single vector :)
        POINT_VECTOR left, right;
        for (int index=0; index < num_threads; index++){
            left.insert(left.end(), left_sub[index].begin(), left_sub[index].end());
            right.insert(right.end(), right_sub[index].begin(), right_sub[index].end());
        }

        // We now have the initial two points belonging to the hill
        // We also split all the points into a group containing points left of AB and a group containing points right of of AB
        // We now recursively find all other points belonging to the convex hull.

        ompfindHull(left,minXPoint, maxXPoint, output);
        ompfindHull(right, maxXPoint, minXPoint, output);

        timer.end();

        return timer.getTimeElapsed();
     }

有谁知道使用 8 个内核仅实现 2 倍加速,而这么大一部分代码是可并行化的是否正常?如果没有,我在这里做错了什么!?

第2部分

现在真正的问题来了……

在 TBB 实现上运行相同的测试会得到以下结果: 小型数据集的运行时 大型数据集的运行时 加速图

可以看出,并行实现的运行时间总是超过顺序实现的运行时间。至于加速图加速比小于一,意味着它是一个负加速!

这是我创建的不同结构的代码:

注意typedef tbb::concurrent_vector<Point*> CPOINT_VECTOR

编辑:应用拱的评论。

    class FindExtremum{
public:
    enum ExtremumType{
        MINIMUM,MAXIMUM
    };

public:
    FindExtremum(CPOINT_VECTOR& points):fPoints(points), fMinPoint(points[0]), fMaxPoint(points[0]){}
    FindExtremum(const FindExtremum& extremum, tbb::split):fPoints(extremum.fPoints), fMinPoint(extremum.fMinPoint), fMaxPoint(extremum.fMaxPoint){}

    void join(const FindExtremum& other){
        Point* curMinPoint = other.fMinPoint;
        Point* curMaxPoint = other.fMaxPoint;

        if (isLargerThan(curMinPoint, MINIMUM)){
            fMinPoint = curMinPoint;
        }

        if (isSmallerThan(curMaxPoint, MAXIMUM)){
            fMaxPoint = curMaxPoint;
        }
    }

    void operator()(const BLOCKED_RANGE& range){
        for (size_t index = range.begin(); index < range.end(); index++){
            Point* curPoint = fPoints[index];

            if (isLargerThan(curPoint, MINIMUM)){
                fMinPoint = curPoint;
            }

            if (isSmallerThan(curPoint, MAXIMUM)){
                fMaxPoint = curPoint;
            }
        }
    }

private:
    bool isSmallerThan(const Point* point, const ExtremumType& type){
        switch (type){
        case MINIMUM:
            return fMinPoint->x < point->x;
        case MAXIMUM:
            return fMaxPoint->x < point->x;
        }
    }

    bool isLargerThan(const Point* point, const ExtremumType& type){
        return !isSmallerThan(point, type);
    }

public:
    Point* getMaxPoint(){
        return this->fMaxPoint;
    }

    Point* getMinPoint(){
        return this->fMinPoint;
    }

public:
    CPOINT_VECTOR fPoints;
    Point* fMinPoint;
    Point* fMaxPoint;

};

class Splitter{
public:
    Splitter(const CPOINT_VECTOR& points, Point* point1, Point* point2,
            Point* farthestPoint, CPOINT_VECTOR* left, CPOINT_VECTOR* right, int grainsize):
        fPoints(points), p1(point1), p2(point2), farthestPoint(farthestPoint), fLeft(left), fRight(right), fGrain(grainsize)
    {
        //fLeft = new tbb::concurrent_vector<Point*>();
        //fRight = new tbb::concurrent_vector<Point*>();
        //fLeft = new vector<Point*>();
        //fRight = new vector<Point*>();
    };

    Splitter(const Splitter& splitter, tbb::split):
        fPoints(splitter.fPoints), p1(splitter.p1), p2(splitter.p2), farthestPoint(splitter.farthestPoint),
        fLeft(splitter.fLeft), fRight(splitter.fRight), fGrain(splitter.fGrain){}

    void operator()(const BLOCKED_RANGE& range) const{
        const int grainsize = fGrain;
        Point** left = new Point*[grainsize];
        Point** right = new Point*[grainsize];
        int leftcounter = 0;
        int rightcounter = 0;
        for (size_t index = range.begin(); index < range.end(); index++){
            Point* curPoint = fPoints[index];
            if (curPoint != farthestPoint){
                if (getPosition(p1, farthestPoint, curPoint) == RIGHT){
                    left[leftcounter++] = curPoint;
                } else if (getPosition(farthestPoint, p2, curPoint) == RIGHT){
                    right[rightcounter++] = curPoint;
                }
            }
        }
        appendVector(left,leftcounter,*fLeft);
        appendVector(right,rightcounter,*fRight);
    }

public:
    Point* p1;
    Point* p2;
    Point* farthestPoint;
    int fGrain;
    CPOINT_VECTOR* fLeft;
    CPOINT_VECTOR* fRight;
    CPOINT_VECTOR fPoints;

};

class InitialSplitter{
public:
    InitialSplitter(const CPOINT_VECTOR& points, CPOINT_VECTOR* left, CPOINT_VECTOR* right,
                    Point* point1, Point* point2, int grainsize):
            fPoints(points), p1(point1), p2(point2), fLeft(left), fRight(right), fGrain(grainsize){}

    InitialSplitter(const InitialSplitter& splitter, tbb::split):
        fPoints(splitter.fPoints), p1(splitter.p1), p2(splitter.p2),
        fLeft(splitter.fLeft), fRight(splitter.fRight), fGrain(splitter.fGrain){
    }

    void operator()(const BLOCKED_RANGE& range) const{
        const int grainsize = fGrain;
        Point** left = new Point*[grainsize];
        Point** right = new Point*[grainsize];
        int leftcounter = 0;
        int rightcounter = 0;
        for (size_t index = range.begin(); index < range.end(); index++){
            Point* curPoint = fPoints[index];
            if (curPoint != p1 || curPoint != p2){
                if (getPosition(p1, p2, curPoint) == RIGHT){
                    left[leftcounter++] = curPoint;
                } else if (getPosition(p2, p1, curPoint) == RIGHT){
                    right[rightcounter++] = curPoint;
                }
            }
        }
        appendVector(left,leftcounter,*fLeft);
        appendVector(right,rightcounter,*fRight);
    }

public:
    CPOINT_VECTOR fPoints;
    int fGrain;
    Point* p1;
    Point* p2;
    CPOINT_VECTOR* fLeft;
    CPOINT_VECTOR* fRight;
};

class FarthestPointFinder{
public:
    FarthestPointFinder(const CPOINT_VECTOR& points, Point* p1, Point* p2):
        fPoints(points), fFarthestPoint(points[0]),fDistance(-1), p1(p1), p2(p2){}

    FarthestPointFinder(const FarthestPointFinder& fpf, tbb::split):
        fPoints(fpf.fPoints), fFarthestPoint(fpf.fFarthestPoint),fDistance(-1), p1(fpf.p1), p2(fpf.p2){}

    void operator()(const BLOCKED_RANGE& range){
        for (size_t index = range.begin(); index < range.end(); index++){
            Point* curPoint = fPoints[index];
            double curDistance = distance(p1,p2,curPoint);
            if (curDistance > fDistance){
                fFarthestPoint = curPoint;
                fDistance = curDistance;
            }
        }
    }

    void join(const FarthestPointFinder& other){
        if (fDistance < other.fDistance){
            fFarthestPoint = other.fFarthestPoint;
            fDistance = other.fDistance;
        }
    }

public:
    Point* getFarthestPoint(){
        return this->fFarthestPoint;
    }

public:
    CPOINT_VECTOR fPoints;
    Point* fFarthestPoint;
    int fDistance;
    Point* p1;
    Point* p2;
};

接下来是 QuickHull 代码:

   void tbbfindHull(CPOINT_VECTOR &input, Point* p1, Point* p2, POINT_VECTOR& output, int max_threads){
    // If there are no points in the set... just stop. This is the stopping criteria for the recursion. :)
    if (input.empty() || input.size() == 0) return;
    else if (input.size() == 1) {
        output.push_back(input[0]);
        return;
    }

    // Get the point that is the farthest from the p1-p2 segment

    int GRAINSIZE = ((double)input.size())/max_threads;

    FarthestPointFinder fpf(input, p1, p2);
    tbb::parallel_reduce(BLOCKED_RANGE(0,input.size(),GRAINSIZE), fpf);
    Point *farthestPoint = fpf.getFarthestPoint();

    // Add the farthest point to the output as it is part of the convex hull.
    output.push_back(farthestPoint);

    // Split in two sets.
    // The first one contains points right from p1 - farthestPoint
    // The second one contains points right from farthestPoint - p2

    CPOINT_VECTOR* left = new CPOINT_VECTOR();
    CPOINT_VECTOR* right = new CPOINT_VECTOR();

    Splitter splitter(input,p1,p2,farthestPoint, left, right, GRAINSIZE);
    tbb::parallel_for(BLOCKED_RANGE(0,input.size(), GRAINSIZE), splitter);

    // We do more recursion :)
    tbbfindHull(*left, p1, farthestPoint, output, max_threads);
    tbbfindHull(*right, farthestPoint, p2, output, max_threads);

}

/**
 * Calling the quickHull algorithm!
 */
double tbbquickHull(POINT_VECTOR input_o, POINT_VECTOR& output, int max_threads){

    CPOINT_VECTOR input;
    for (int i =0; i < input_o.size(); i++){
        input.push_back(input_o[i]);
    }

    int GRAINSIZE = input.size()/max_threads;
    Timer timer;
    timer.start();

    // Find the left- and rightmost point.
    FindExtremum fextremum(input);
    tbb::parallel_reduce(BLOCKED_RANGE(0, input.size(),GRAINSIZE), fextremum);

    Point* minXPoint = fextremum.getMinPoint();
    Point* maxXPoint = fextremum.getMaxPoint();

    // These points are sure to be part of the convex hull, so add them
    output.push_back(minXPoint);
    output.push_back(maxXPoint);

    // Now we have to split the set of point in subsets.
    // The first one containing all points above the line
    // The second one containing all points below the line
    CPOINT_VECTOR* left = new CPOINT_VECTOR;
    CPOINT_VECTOR* right = new CPOINT_VECTOR;

    //Timer temp1;
    //temp1.start();
    InitialSplitter splitter(input, left, right, minXPoint, maxXPoint, GRAINSIZE);
    tbb::parallel_for(BLOCKED_RANGE(0, input.size(),GRAINSIZE), splitter);
    // We now have the initial two points belonging to the hill
    // We also split all the points into a group containing points left of AB and a group containing points right of of AB
    // We now recursively find all other points belonging to the convex hull.

    tbbfindHull(*left,minXPoint, maxXPoint, output, max_threads);
    tbbfindHull(*right, maxXPoint, minXPoint, output, max_threads);
    timer.end();

    return timer.getTimeElapsed();
}

在 TBB 中,当对代码的不同并行部分进行计时时,可能会注意到一些异常情况。使用 tbb::parallel_for() 将整个子集初始划分InitialSplitter为两个子集所花费的时间几乎与相应 OpenMP 版本的整个运行时间一样长,并且当使用不同数量的线程时,这个时间不会发生变化。InitialSplitter这很奇怪,因为在作为参数传递给 tbb::parallel_for() 的 -object内部进行计时时可以注意到显着的加速。在中迭代的 for 循环InitialSplitters显示了当线程数增加时预期的加速。

我认为一个单一的tbb::parallel_for()——例如在初始化中获取一个InitialSplitter实例的那个——需要与整个 OpenMP 实现一样长的时间来运行,这很奇怪。我还认为很奇怪,当tbb::parallel_for()可以观察到没有加速的时间时,而在InitialSplitters-operator() 内部的时间可以观察到接近线性的加速......

这里有没有人可以帮助我!?

提前致谢!

4

2 回答 2

1

要获得 QuickHull 算法的并行加速,需要对数据结构格外小心。在大多数情况下,顶层分区确实支配了运行时间,所以它应该是并行的。但是,使用 concurrent_vector::push_back 会引入很多跨线程缓存争用,因为不同的线程会推送到相同的缓存行(所谓的“错误共享”),并且这些线程会争夺具有 concurrent_vector 大小的缓存行。

在 TBB 发行版中,在目录 examples/parallel_reduce/convex_hull/ 中查看一个可以加速的完整 QuickHull 算法。它使用parallel_reduce(不是parallel_for)来进行顶层分区。它确实使用和 concurrent_vector,但在将它们复制到 concurrent_vector 之前对它们进行批处理,以最大限度地减少错误共享问题。

于 2013-05-23T17:44:31.163 回答
1

我有一些你可能会觉得有用的评论。

一般来说,我尽量避免直接访问线程号(thread_id)。而不是定义大小等于线程数的数组,您应该在并行块中定义变量(这会自动将它们设为私有)。然后,您应该使用原子的、关键的或单一的方法,而不是在并行块之后循环线程数(我不确定哪种方法最好)。像这样的东西。

Point* farthestPoint;
//int distance = *distance_sub[0];  //is this a bug shouldn't distance be a double?
double distance = 0
#pragma omp parallel private
{
    Point farthest_sub = input[0];
    double distance_sub = 0;
     #pragma omp for nowait
     for (int index = 1; index < input.size(); index++){
     // for loop code
     }
     #pragma omp critical 
     {        
         if (distance < distance_sub){
            farthestPoint = farthest_sub;
         }
     }
     #pragma omp barrier 
     //next part of code 
}

您现在可能遇到的一个问题是虚假共享。每个线程都试图写入同一缓存行中的数组(例如,在数组 distance_sub[num_threads] 中)。当您在并行块内声明值时,我不确定 OpenMP 会做什么,但我怀疑它更有可能分配值以避免错误共享。

您的另一条评论应该尽量避免多次调用 OpenMP,尤其是在少数元素上。OpenMP 存在开销。我会尝试使用屏障或其他任何东西在一个并行块中获得尽可能多的东西。

另外,在您的代码中,int distance = *distance_sub[0]这是一个错误吗?距离不应该是双倍的吗?

最后,一个迂腐的观点。我怀疑你有8个核心。由于英特尔超线程,您可能有 4 个内核和 8 个硬件线程。区别可能很重要。

于 2013-05-23T11:35:40.180 回答