16

我有一个将拜耳图像通道转换为 RGB 的算法。在我的实现中,我有一个嵌套for循环,它遍历拜耳通道,从拜耳索引计算 rgb 索引,然后从拜耳通道设置该像素的值。这里要注意的主要事情是每个像素都可以独立于其他像素进行计算(不依赖于先前的计算),因此该算法是并行化的自然候选者。然而,计算确实依赖于所有线程将同时访问但不会改变的一些预设数组。

但是,当我尝试将主要for与 MS并行化时cuncurrency::parallel_for,性能并没有得到任何提升。事实上,对于在 4 核 CPU 上运行的大小为 3264X2540 的输入,非并行版本的运行时间约为 34 毫秒,并行版本的运行时间约为 69 毫秒(平均超过 10 次运行)。我确认该操作确实是并行化的(为该任务创建了 3 个新线程)。

使用英特尔的编译器tbb::parallel_for给出了接近准确的结果。作为比较,我从实现的算法开始,C#其中我还使用了parallel_for循环,在那里我遇到了接近 X4 的性能提升(我选择了这个算法,C++因为C++即使使用单核,这个特定任务也更快)。

有什么想法阻止我的代码很好地并行化吗?

我的代码:

template<typename T>
void static ConvertBayerToRgbImageAsIs(T* BayerChannel, T* RgbChannel, int Width, int Height, ColorSpace ColorSpace)
{
        //Translates index offset in Bayer image to channel offset in RGB image
        int offsets[4];
        //calculate offsets according to color space
        switch (ColorSpace)
        {
        case ColorSpace::BGGR:
            offsets[0] = 2;
            offsets[1] = 1;
            offsets[2] = 1;
            offsets[3] = 0;
            break;
        ...other color spaces
        }
        memset(RgbChannel, 0, Width * Height * 3 * sizeof(T));
        parallel_for(0, Height, [&] (int row)
        {
            for (auto col = 0, bayerIndex = row * Width; col < Width; col++, bayerIndex++)
            {
                auto offset = (row%2)*2 + (col%2); //0...3
                auto rgbIndex = bayerIndex * 3 + offsets[offset];
                RgbChannel[rgbIndex] = BayerChannel[bayerIndex];
            }
        });
}
4

8 回答 8

22

首先,您的算法是 memory bandwidth bounded。也就是说,内存加载/存储将超过您所做的任何索引计算。

SSE/之类的向量运算AVX也无济于事-您没有进行任何密集计算。

增加每次迭代的工作量也是没有用的——两者PPLTBB足够聪明,不会在每次迭代中创建线程,他们会使用一些好的分区,这会额外尝试保持局部性。例如,这里引用自TBB::parallel_for

当工作线程可用时,parallel_for执行迭代是不确定的顺序。不要依赖任何特定的执行顺序来确保正确性。但是,为了提高效率,确实期望 parallel_for 倾向于对 values 的连续运行进行操作

真正重要的是减少内存操作。对输入或输出缓冲区的任何多余遍历都会影响性能,因此您应该尝试删除您的memset或并行执行。

您正在完全遍历输入和输出数据。即使您在输出中跳过某些内容 - 这并不重要,因为现代硬件上的内存操作是通过 64 字节块进行的。因此,计算size您的输入和输出、time算法度量、划分size/time比较结果与系统的最大特征(例如,用benchmark度量)。

我对 和 进行了测试,Microsoft PPL结果是(我使用了你身高的 8 倍):OpenMPNative for

Native_For       0.21 s
OpenMP_For       0.15 s
Intel_TBB_For    0.15 s
MS_PPL_For       0.15 s

如果删除memset则:

Native_For       0.15 s
OpenMP_For       0.09 s
Intel_TBB_For    0.09 s
MS_PPL_For       0.09 s

如您所见memset(高度优化的)对大量执行时间负责,这表明您的算法是如何受内存限制的。

完整的源代码

#include <boost/exception/detail/type_info.hpp>
#include <boost/mpl/for_each.hpp>
#include <boost/mpl/vector.hpp>
#include <boost/progress.hpp>
#include <tbb/tbb.h>
#include <iostream>
#include <ostream>
#include <vector>
#include <string>
#include <omp.h>
#include <ppl.h>

using namespace boost;
using namespace std;

const auto Width = 3264;
const auto Height = 2540*8;

struct MS_PPL_For
{
    template<typename F,typename Index>
    void operator()(Index first,Index last,F f) const
    {
        concurrency::parallel_for(first,last,f);
    }
};

struct Intel_TBB_For
{
    template<typename F,typename Index>
    void operator()(Index first,Index last,F f) const
    {
        tbb::parallel_for(first,last,f);
    }
};

struct Native_For
{
    template<typename F,typename Index>
    void operator()(Index first,Index last,F f) const
    {
        for(; first!=last; ++first) f(first);
    }
};

struct OpenMP_For
{
    template<typename F,typename Index>
    void operator()(Index first,Index last,F f) const
    {
        #pragma omp parallel for
        for(auto i=first; i<last; ++i) f(i);
    }
};

template<typename T>
struct ConvertBayerToRgbImageAsIs
{
    const T* BayerChannel;
    T* RgbChannel;
    template<typename For>
    void operator()(For for_)
    {
        cout << type_name<For>() << "\t";
        progress_timer t;
        int offsets[] = {2,1,1,0};
        //memset(RgbChannel, 0, Width * Height * 3 * sizeof(T));
        for_(0, Height, [&] (int row)
        {
            for (auto col = 0, bayerIndex = row * Width; col < Width; col++, bayerIndex++)
            {
                auto offset = (row % 2)*2 + (col % 2); //0...3
                auto rgbIndex = bayerIndex * 3 + offsets[offset];
                RgbChannel[rgbIndex] = BayerChannel[bayerIndex];
            }
        });
    }
};

int main()
{
    vector<float> bayer(Width*Height);
    vector<float> rgb(Width*Height*3);
    ConvertBayerToRgbImageAsIs<float> work = {&bayer[0],&rgb[0]};
    for(auto i=0;i!=4;++i)
    {
        mpl::for_each<mpl::vector<Native_For, OpenMP_For,Intel_TBB_For,MS_PPL_For>>(work);
        cout << string(16,'_') << endl;
    }
}
于 2013-04-15T18:08:55.577 回答
5

同步开销

我猜想循环的每次迭代完成的工作量太小了。如果您将图像分成四部分并并行运行计算,您会注意到很大的增益。尝试以减少迭代次数和每次迭代更多工作的方式设计循环。这背后的原因是同步完成了太多。

缓存使用

一个重要的因素可能是数据如何拆分(分区)以进行处理。如果处理的行在下面的坏情况下被分开,那么更多的行将导致缓存未命中。每增加一个线程,这种效果就会变得更加重要,因为行之间的距离会更大。如果您确定并行化功能执行合理的分区,那么手动工作拆分不会给出任何结果

 bad       good
****** t1 ****** t1
****** t2 ****** t1
****** t1 ****** t1
****** t2 ****** t1
****** t1 ****** t2
****** t2 ****** t2
****** t1 ****** t2
****** t2 ****** t2

还要确保您以相同的对齐方式访问您的数据;每次调用offset[]和都可能BayerChannel[]是缓存未命中。您的算法非常占用内存。几乎所有操作都是访问内存段或写入内存段。防止缓存未命中和最小化内存访问至关重要。

代码优化

下面显示的优化可能由编译器完成,可能不会给出更好的结果。值得知道他们可以做到。

    // is the memset really necessary?
    //memset(RgbChannel, 0, Width * Height * 3 * sizeof(T));
    parallel_for(0, Height, [&] (int row)
    {
        int rowMod = (row & 1) << 1;
        for (auto col = 0, bayerIndex = row * Width, tripleBayerIndex=row*Width*3; col < Width; col+=2, bayerIndex+=2, tripleBayerIndex+=6)
        {
            auto rgbIndex = tripleBayerIndex + offsets[rowMod];
            RgbChannel[rgbIndex] = BayerChannel[bayerIndex];

            //unrolled the loop to save col & 1 operation
            rgbIndex = tripleBayerIndex + 3 + offsets[rowMod+1];
            RgbChannel[rgbIndex] = BayerChannel[bayerIndex+1];
        }
    });
于 2013-04-15T08:42:10.637 回答
3

我的建议来了:

  1. 并行计算更大的块
  2. 摆脱模/乘法
  3. 展开内部循环以计算一个完整像素(简化代码)

    template<typename T> void static ConvertBayerToRgbImageAsIsNew(T* BayerChannel, T* RgbChannel, int Width, int Height)
    {
        // convert BGGR->RGB
        // have as many threads as the hardware concurrency is
        parallel_for(0, Height, static_cast<int>(Height/(thread::hardware_concurrency())), [&] (int stride)
        {
            for (auto row = stride; row<2*stride; row++)
            {
                for (auto col = row*Width, rgbCol =row*Width; col < row*Width+Width; rgbCol +=3, col+=4)
                {
                    RgbChannel[rgbCol+0]  = BayerChannel[col+3];
                    RgbChannel[rgbCol+1]  = BayerChannel[col+1];
                    // RgbChannel[rgbCol+1] += BayerChannel[col+2]; // this line might be left out if g is used unadjusted
                    RgbChannel[rgbCol+2]  = BayerChannel[col+0];
                }
            }
        });
    }
    

这段代码比原始版本快 60%,但仍然只有笔记本电脑上非并行版本的一半。正如其他人已经指出的那样,这似乎是由于算法的内存有界性。

编辑:但我对此并不满意。parallel_for从到时,我可以大大提高并行性能std::async

int hc = thread::hardware_concurrency();
future<void>* res = new future<void>[hc];
for (int i = 0; i<hc; ++i)
{
    res[i] = async(Converter<char>(bayerChannel, rgbChannel, rows, cols, rows/hc*i, rows/hc*(i+1)));
}
for (int i = 0; i<hc; ++i)
{
    res[i].wait();
}
delete [] res;

转换器是一个简单的类:

template <class T> class Converter
{
public:
Converter(T* BayerChannel, T* RgbChannel, int Width, int Height, int startRow, int endRow) : 
    BayerChannel(BayerChannel), RgbChannel(RgbChannel), Width(Width), Height(Height), startRow(startRow), endRow(endRow)
{
}
void operator()()
{
    // convert BGGR->RGB
    for(int row = startRow; row < endRow; row++)
    {
        for (auto col = row*Width, rgbCol =row*Width; col < row*Width+Width; rgbCol +=3, col+=4)
        {
            RgbChannel[rgbCol+0]  = BayerChannel[col+3];
            RgbChannel[rgbCol+1]  = BayerChannel[col+1];
            // RgbChannel[rgbCol+1] += BayerChannel[col+2]; // this line might be left out if g is used unadjusted
            RgbChannel[rgbCol+2]  = BayerChannel[col+0];
        }
    };
}
private:
T* BayerChannel;
T* RgbChannel;
int Width;
int Height;
int startRow;
int endRow;
};

这现在比非并行版本快 3.5 倍。从我目前在分析器中看到的情况来看,我假设 parallel_for 的工作窃取方法会导致大量等待和同步开销。

于 2013-04-19T09:10:59.370 回答
2

我没有使用 tbb::parallel_for 而不是 cuncurrency::parallel_for,但如果你的数字是正确的,它们似乎会带来太多的开销。但是,我强烈建议您在测试时运行 10 次以上的迭代,并确保在计时之前进行尽可能多的热身迭代。

我使用三种不同的方法准确地测试了你的代码,平均超过 1000 次尝试。

Serial:      14.6 += 1.0  ms
std::async:  13.6 += 1.6  ms
workers:     11.8 += 1.2  ms

首先是串行计算。第二个是使用四个对 std::async 的调用来完成的。最后一个是通过将四个作业发送到四个已经启动(但正在休眠)的后台线程来完成的。

收益不大,但至少是收益。我在 2012 MacBook Pro 上进行了测试,双超线程内核 = 4 个逻辑内核。

作为参考,这是我的 std::async 并行:

template<typename Int=int, class Fun>
void std_par_for(Int beg, Int end, const Fun& fun)
{
    auto N = std::thread::hardware_concurrency();
    std::vector<std::future<void>> futures;

    for (Int ti=0; ti<N; ++ti) {
        Int b = ti * (end - beg) / N;
        Int e = (ti+1) * (end - beg) / N;
        if (ti == N-1) { e = end; }

        futures.emplace_back( std::async([&,b,e]() {
            for (Int ix=b; ix<e; ++ix) {
                fun( ix );
            }
        }));
    }

    for (auto&& f : futures) {
        f.wait();
    }
}
于 2013-04-15T13:05:43.653 回答
2

要检查或要做的事情

  • 您使用的是 Core 2 或更旧的处理器吗?他们有一个非常狭窄的内存总线,很容易用这样的代码饱和。相比之下,4 通道 Sandy Bridge-E 处理器需要多个线程来使内存总线饱和(单个内存绑定线程不可能使其完全饱和)。
  • 你填充了所有的记忆通道吗?例如,如果您有一个双通道 CPU,但只安装了一个 RAM 卡或两个在同一通道上,您将获得一半的可用带宽。
  • 你如何计时你的代码?
    • 应该像 Evgeny Panasyuk 建议的那样在应用程序内部完成计时。
    • 您应该在同一个应用程序中进行多次运行。否则,您可能正在计时一次性启动代码以启动线程池等。
  • memset正如其他人所解释的那样,删除多余的 。
  • 正如 ogni42 和其他人所建议的那样,展开你的内部循环(我没有费心检查该解决方案的正确性,但如果它是错误的,你应该能够修复它)。这与并行化的主要问题是正交的,但无论如何这是一个好主意。
  • 在进行性能测试时,请确保您的机器处于空闲状态。

额外的时间

我已将 Evgeny Panasyuk 和 ogni42 的建议合并到一个简单的 C++03 Win32 实现中:

#include "stdafx.h"

#include <omp.h>
#include <vector>
#include <iostream>
#include <stdio.h>

using namespace std;

const int Width = 3264;
const int Height = 2540*8;

class Timer {
private:
    string name;
    LARGE_INTEGER start;
    LARGE_INTEGER stop;
    LARGE_INTEGER frequency;
public:
    Timer(const char *name) : name(name) {
        QueryPerformanceFrequency(&frequency);
        QueryPerformanceCounter(&start);
    }

    ~Timer() {
        QueryPerformanceCounter(&stop);
        LARGE_INTEGER time;
        time.QuadPart = stop.QuadPart - start.QuadPart;
        double elapsed = ((double)time.QuadPart /(double)frequency.QuadPart);
        printf("%-20s : %5.2f\n", name.c_str(), elapsed);
    }
};

static const int offsets[] = {2,1,1,0};

template <typename T>
void Inner_Orig(const T* BayerChannel, T* RgbChannel, int row)
{
    for (int col = 0, bayerIndex = row * Width;
         col < Width; col++, bayerIndex++)
    {
        int offset = (row % 2)*2 + (col % 2); //0...3
        int rgbIndex = bayerIndex * 3 + offsets[offset];
        RgbChannel[rgbIndex] = BayerChannel[bayerIndex];
    }
}

// adapted from ogni42's answer
template <typename T>
void Inner_Unrolled(const T* BayerChannel, T* RgbChannel, int row)
{
    for (int col = row*Width, rgbCol =row*Width;
         col < row*Width+Width; rgbCol +=3, col+=4)
    {
        RgbChannel[rgbCol+0]  = BayerChannel[col+3];
        RgbChannel[rgbCol+1]  = BayerChannel[col+1];
        // RgbChannel[rgbCol+1] += BayerChannel[col+2]; // this line might be left out if g is used unadjusted
        RgbChannel[rgbCol+2]  = BayerChannel[col+0];
    }
}

int _tmain(int argc, _TCHAR* argv[])
{
    vector<float> bayer(Width*Height);
    vector<float> rgb(Width*Height*3);
    for(int i = 0; i < 4; ++i)
    {
        {
            Timer t("serial_orig");
            for(int row = 0; row < Height; ++row) {
                Inner_Orig<float>(&bayer[0], &rgb[0], row);
            }
        }
        {
            Timer t("omp_dynamic_orig");
            #pragma omp parallel for
            for(int row = 0; row < Height; ++row) {
                Inner_Orig<float>(&bayer[0], &rgb[0], row);
            }
        }
        {
            Timer t("omp_static_orig");
            #pragma omp parallel for schedule(static)
            for(int row = 0; row < Height; ++row) {
                Inner_Orig<float>(&bayer[0], &rgb[0], row);
            }
        }

        {
            Timer t("serial_unrolled");
            for(int row = 0; row < Height; ++row) {
                Inner_Unrolled<float>(&bayer[0], &rgb[0], row);
            }
        }
        {
            Timer t("omp_dynamic_unrolled");
            #pragma omp parallel for
            for(int row = 0; row < Height; ++row) {
                Inner_Unrolled<float>(&bayer[0], &rgb[0], row);
            }
        }
        {
            Timer t("omp_static_unrolled");
            #pragma omp parallel for schedule(static)
            for(int row = 0; row < Height; ++row) {
                Inner_Unrolled<float>(&bayer[0], &rgb[0], row);
            }
        }
        printf("-----------------------------\n");
    }
    return 0;
}

以下是我在三通道 8 路超线程 Core i7-950 机器上看到的时序:

serial_orig          :  0.13
omp_dynamic_orig     :  0.10
omp_static_orig      :  0.10
serial_unrolled      :  0.06
omp_dynamic_unrolled :  0.04
omp_static_unrolled  :  0.04

“静态”版本告诉编译器在循环入口处平均分配线程之间的工作。这避免了尝试进行工作窃取或其他动态负载平衡的开销。对于这个代码片段,它似乎没有什么不同,即使工作负载在线程之间是非常统一的。

于 2013-04-21T12:52:23.323 回答
0

可能会发生性能下降,因为您试图在“行”数量的内核上分配 for 循环,这将不可用,因此它再次变得像具有并行开销的顺序执行。

于 2013-04-15T11:02:23.850 回答
0

对并行 for 循环不太熟悉,但在我看来,争用在于内存访问。看来您的线程正在重叠访问相同的页面。

你能把你的数组访问分成与页面边界有点对齐的 4k 块吗?

于 2013-04-15T11:23:05.387 回答
0

在没有为串行代码优化 for 循环之前谈论并行性能是没有意义的。这是我的尝试(一些好的编译器可能能够获得类似的优化版本,但我宁愿不依赖它)

    parallel_for(0, Height, [=] (int row) noexcept
    {
        for (auto col=0, bayerindex=row*Width,
                  rgb0=3*bayerindex+offset[(row%2)*2],
                  rgb1=3*bayerindex+offset[(row%2)*2+1];
             col < Width; col+=2, bayerindex+=2, rgb0+=6, rgb1+=6 )
        {
            RgbChannel[rgb0] = BayerChannel[bayerindex  ];
            RgbChannel[rgb1] = BayerChannel[bayerindex+1];
        }
    }); 
于 2013-04-15T16:09:25.873 回答