0

I'm benchmarking matrix multiplication performance for serial and Cilk array notation versions. The Cilk implementation takes nearly twice as long as the serial and I don't understand why.

This is for single core execution

This is the meat of the Cilk multiplication. Due to rank restrictions, I must store each multiplication in an array then __sec_reduce_add this array before setting the destination matrix element value.

int* sum = new int[VEC_SIZE];
for (int r = 0; (r < rowsThis); r++) {
    for (int c = 0; (c < colsOther); c++) {
        sum[0:VEC_SIZE] = myData[r][0:VEC_SIZE] * otherData[0:VEC_SIZE][c]; 
        product.data[r][c] = __sec_reduce_add(sum[0:VEC_SIZE]);
    }
}

I understand caching concerns and don't see any reason for the Cilk version to have fewer cache hits than the serial because they both access a column array that is hopefully in cache along with a series of misses for row elements.

Is there an obvious dependency that I'm overlooking or syntactic element of Cilk that I should be using? Should I be using Cilk in a different way to achieve maximum performance for non-block matrix multiplication using SIMD operations?

I'm very new to Cilk so any help/suggestion is welcome.

EDIT:
Here's the serial implementation:

for (int row = 0; (row < rowsThis); row++) {
    for (int col = 0; (col < colsOther); col++) {
        int sum = 0;
        for (int i = 0; (i < VEC_SIZE); i++)  {
            sum += (matrix1[row][i] * matrix2[i][col]);
        }
        matrix3[row][col] = sum;
    }
}

Memory is (de)allocated appropriately and multiplication results are correct for both implementations. Matrix sizes are not known at compile time and a variety are used throughout the benchmark.

Compiler: icpc (ICC) 15.0.0 20140723
Compile flags: icpc -g Wall -O2 -std=c++11

Ignore the use of allocated memory, conversion from vectors to vanilla arrays, etc. I hacked apart another program to run this on a hunch assuming it'd be simpler than it in fact turned out to be. I couldn't get the compiler to accept cilk notation on both dimensions of the 2D vectors so I decided to use traditional arrays for the sake of the benchmark.

Here are all the appropriate files:
MatrixTest.cpp

#include <string>
#include <fstream>
#include <stdlib.h>
#include "Matrix.h"


#define MATRIX_SIZE 2000
#define REPETITIONS 1

int main(int argc, char** argv) {

    auto init = [](int row, int col) { return row + col; };

    const Matrix matrix1(MATRIX_SIZE, MATRIX_SIZE, init);
    const Matrix matrix2(MATRIX_SIZE, MATRIX_SIZE, init);
    for (size_t i = 0; (i < REPETITIONS); i++) {
        const Matrix matrix3 = matrix1 * matrix2;
        std::cout << "Diag sum: " << matrix3.sumDiag() << std::endl;
    }    
    return 0;
}

Matrix.h

#ifndef MATRIX_H
#define MATRIX_H

#include <iostream>
#include <functional>
#include <vector>

using TwoDVec = std::vector<std::vector<int>>;

class Matrix {                                                                                                                                                                     
public:                                                                                                                                                                            
    Matrix();                                                                                                                                                                      
    ~Matrix();                                                                                                                                                                     
    Matrix(int rows, int cols);                                                                                                                                                    
    Matrix(int rows, int cols, std::function<Val(int, int)> init);                                                                                                                 
    Matrix(const Matrix& src);                                                                                                                                                     
    Matrix(Matrix&& src);                                                                                                                                                          
    Matrix operator*(const Matrix& src) const throw(std::exception);                                                                                                               
    Matrix& operator=(const Matrix& src);                                                                                                                                          
    int sumDiag() const;                                                                                                                                                           

 protected:                                                                                                                                                                        
    int** getRawData() const;                                                                                                                                                      
 private:                                                                                                                                                                          
    TwoDVec data;                                                                                                                                                                  
};

#endif

Matrix.cpp

#include <iostream>
#include <algorithm>
#include <stdexcept>
#include "Matrix.h"

#if defined(CILK)

Matrix
Matrix::operator*(const Matrix& other) const throw(std::exception) {
    int rowsOther = other.data.size();
    int colsOther = rowsOther > 0 ? other.data[0].size() : 0;
    int rowsThis = data.size();
    int colsThis = rowsThis > 0 ? data[0].size() : 0;
    if (colsThis != rowsOther) {
        throw std::runtime_error("Invalid matrices for multiplication.");
    }

    int** thisRaw = this->getRawData();  // held until d'tor
    int** otherRaw = other.getRawData();

    Matrix product(rowsThis, colsOther);

    const int VEC_SIZE = colsThis;

    for (int r = 0; (r < rowsThis); r++) {
        for (int c = 0; (c < colsOther); c++) {
            product.data[r][c] = __sec_reduce_add(thisRaw[r][0:VEC_SIZE]
                                             * otherRaw[0:VEC_SIZE][c]);
        }
    }
    delete[] thisRaw;
    delete[] otherRaw;

    return product;
}

#elif defined(SERIAL)

Matrix
Matrix::operator*(const Matrix& other) const throw(std::exception) {
    int rowsOther = other.data.size();
    int colsOther = rowsOther > 0 ? other.data[0].size() : 0;
    int rowsThis = data.size();
    int colsThis = rowsThis > 0 ? data[0].size() : 0;
    if (colsThis != rowsOther) {
        throw std::runtime_error("Invalid matrices for multiplication.");
    }

    int** thisRaw = this->getRawData();  // held until d'tor
    int** otherRaw = other.getRawData();

    Matrix product(rowsThis, colsOther);

    const int VEC_SIZE = colsThis;

    for (int r = 0; (r < rowsThis); r++) {
        for (int c = 0; (c < colsOther); c++) {
            int sum = 0;
            for (int i = 0; (i < VEC_SIZE); i++) {
                sum += (thisRaw[r][i] * otherRaw[i][c]);
            }
            product.data[r][c] = sum;
        }
    }
    delete[] thisRaw;
    delete[] otherRaw;

    return product;
}

#endif

//  Default c'tor
Matrix::Matrix()
    : Matrix(1,1) { }

Matrix::~Matrix() { }

// Rows/Cols c'tor
Matrix::Matrix(int rows, int cols)
    : data(TwoDVec(rows, std::vector<int>(cols, 0))) { }

//  Init func c'tor
Matrix::Matrix(int rows, int cols, std::function<int(int, int)> init)
    : Matrix(rows, cols) {
    for (int r = 0; (r < rows); r++) {
        for (int c = 0; (c < cols); c++) {
            data[r][c] = init(r,c);
        }
    }
}

//  Copy c'tor
Matrix::Matrix(const Matrix& other) {
    int rows = other.data.size();
    int cols = rows > 0 ? other.data[0].size() : 0;
    this->data.resize(rows, std::vector<int>(cols, 0));
    for(int r = 0; (r < rows); r++) {
        this->data[r] = other.data[r];
    }
}


//  Move c'tor
Matrix::Matrix(Matrix&& other) {
    if (this == &other) return;
    this->data.clear();
    int rows = other.data.size();
    for (int r = 0; (r < rows); r++) {
        this->data[r] = std::move(other.data[r]);
    }
}


Matrix&
Matrix::operator=(const Matrix& other) {
    int rows = other.data.size();
    this->data.resize(rows, std::vector<int>(0));
    for (int r = 0; (r < rows); r++) {
        this->data[r] = other.data[r];
    }
    return *this;
}

int
Matrix::sumDiag() const {
    int rows = data.size();
    int cols = rows > 0 ? data[0].size() : 0;

    int dim = (rows < cols ? rows : cols);
    int sum = 0;
    for (int i = 0; (i < dim); i++) {
        sum += data[i][i];
    }
    return sum;
}


int**
Matrix::getRawData() const {
    int** rawData = new int*[data.size()];
    for (int i = 0; (i < data.size()); i++) {
        rawData[i] = const_cast<int*>(data[i].data());
    }
    return rawData;
}
4

1 回答 1

1

[2015-3-30 更新以匹配长代码示例。]

icc 可能正在自动矢量化您的累积循环,因此 Cilk Plus 正在与矢量化代码竞争。这里有两个可能的性能问题:

  1. 临时数组总和增加了加载和存储的数量。串行代码仅执行两次(SIMD)加载,并且每次(SIMD)乘法几乎没有存储。使用临时数组,每次乘法有 3 次加载和 1 次存储。

  2. matrix2 具有非单元跨步访问模式(也在串行代码中)。典型的当前硬件在单位步幅访问中工作得更好。

要解决问题 (1),请消除临时数组,就像您稍后在更长的样本中所做的那样。例如:

for (int r = 0; (r < rowsThis); r++) {
    for (int c = 0; (c < colsOther); c++) {
        product.data[r][c] = __sec_reduce_add(thisRaw[r][0:VEC_SIZE] * otherRaw[0:VEC_SIZE][c]);
    }
}

结果的等级__sec_reduce_add为零,因此可以分配给标量。

更好的是,解决步幅问题。除非结果矩阵非常宽,否则一个好的方法是按行累积结果,如下所示:

for (int r = 0; (r < rowsThis); r++) {
    product.data[r].data()[0:colsOther] = 0;
    for (int k = 0; (k < VEC_SIZE); k++) {
        product.data[r].data()[0:colsOther] +=thisRaw[r][k] * otherRaw[k][0:colsOther];
    }
}

注意data()这里的使用。数组表示法目前不允许将节表示法与重载[]的 from一起使用std::vector。所以我曾经data()获得一个指向底层数据的指针,我可以将它与数组表示法一起使用。

现在数组部分都有单位步长,因此编译器可以有效地使用向量硬件。上面的方案通常是我最喜欢的构建无阻塞矩阵乘法的方法。当使用icpc -g -Wall -O2 -xHost -std=c++11 -DCILKi7-4770 处理器编译并运行时,我看到 MatrixTest 程序时间从大约 52 秒下降到 1.75 秒,性能提高了 29 倍。

product通过消除归零(在构造时已经完成)和消除临时指针数组的构造,可以简化代码并加快速度。这是修改后的运算符*的完整定义:

Matrix
Matrix::operator*(const Matrix& other) const throw(std::exception) {
    int rowsOther = other.data.size();
    int colsOther = rowsOther > 0 ? other.data[0].size() : 0;
    int rowsThis = data.size();
    int colsThis = rowsThis > 0 ? data[0].size() : 0;
    if (colsThis != rowsOther) {
        throw std::runtime_error("Invalid matrices for multiplication.");
    }

    Matrix product(rowsThis, colsOther);

    for (int r = 0; (r < rowsThis); r++) {
        product.data[r].data()[0:colsOther] = 0;
        for (int k = 0; (k < colsThis); k++) {
            product.data[r].data()[0:colsOther] += (*this).data[r][k] * other.data[k].data()[0:colsOther];
        }
    }

    return product;
}

Matrix如果有计算方法,那么长线会更短data[i].data()

于 2015-03-24T19:23:45.583 回答