2

我遇到了一个相当奇怪的 openMP 问题。

任务是获取一个字符串向量并将每个元素拆分为其包含的 k-Mers(所有包含的长度为 k 的子字符串)。这应该沿着向量的元素微不足道地并行化,因为 k-Merification 过程对于每个元素都是独立发生的。我想将结果存储在 map/set STL 数据结构 ( std::map<long long, std::map<std::string, std::set<unsigned int> > > local_forReturn) 中,并为此分配了一个线程局部变量。

然而,实现的并行化行为却出人意料地糟糕——Linux 上的 top 显示 CPU 使用率约为 200%,尽管在 40 核机器上运行 40 个线程。(我已经测试过该#omp critical部分不是瓶颈)。

我的预感是这可能与错误共享有关,因为我的本地化地图/集 STL 类中包含的实际数据最终会出现在堆上。但是,我既不知道如何测试我的直觉,也不知道如何减少 STL 构造的错误共享(如果这是问题所在)。我将不胜感激任何想法!

完整代码:

#include <string>
#include <assert.h>
#include <set>
#include <map>
#include <vector>
#include <omp.h>
#include <iostream>

int threads = 40;
int k = 31;

std::string generateRandomSequence(int length);
char randomNucleotide();
std::vector<std::string> partitionStringIntokMers(std::string str, int k);

int main(int argc, char *argv[])
{
    // generate test data
    std::vector<std::string> requiredSEQ;
    for(unsigned int i = 0; i < 10000; i++)
    {
        std::string seq = generateRandomSequence(20000);
        requiredSEQ.push_back(seq);
    }

    // this variable will contain the final result
    std::map<long long, std::map<std::string, std::map<unsigned int, int> > > forReturn;

    omp_set_num_threads(threads);

    std::cerr << "Data generated, now start parallel processing\n" << std::flush;

    // split workload (ie requiredSEQ) according to number of threads
    long long max_i = requiredSEQ.size() - 1;
    long long chunk_size = max_i / threads;
    #pragma omp parallel
    {
        assert(omp_get_num_threads() == threads);
        long long thisThread = omp_get_thread_num();
        long long firstPair = thisThread * chunk_size;
        long long lastPair = (thisThread+1) * chunk_size - 1;
        if((thisThread == (threads-1)) && (lastPair < max_i))
        {
            lastPair = max_i;
        }

        std::map<long long, std::map<std::string, std::map<unsigned int, int> > > local_forReturn;

        for(long long seqI = firstPair; seqI <= lastPair; seqI++)
        {
            const std::string& SEQ_sequence = requiredSEQ.at(seqI);

            const std::vector<std::string> kMersInSegment = partitionStringIntokMers(SEQ_sequence, k);
            for(unsigned int kMerI = 0; kMerI < kMersInSegment.size(); kMerI++)
            {
                const std::string& kMerSeq = kMersInSegment.at(kMerI);
                local_forReturn[seqI][kMerSeq][kMerI]++;
            }   
        }

        #pragma omp critical
        {
            forReturn.insert(local_forReturn.begin(), local_forReturn.end());
        }
    }

    return 0;   
}

std::string generateRandomSequence(int length)
{
    std::string forReturn;
    forReturn.resize(length);
    for(int i = 0; i < length; i++)
    {
        forReturn.at(i) = randomNucleotide();
    }
    return forReturn;
}

char randomNucleotide()
{
    char nucleotides[4] = {'A', 'C', 'G', 'T'};
    int n = rand() % 4;
    assert((n >= 0) && (n <= 3));
    return nucleotides[n];
}


std::vector<std::string> partitionStringIntokMers(std::string str, int k)
{
    std::vector<std::string> forReturn;
    if((int)str.length() >= k)
    {
        forReturn.resize((str.length() - k)+1); 
        for(int i = 0; i <= (int)(str.length() - k); i++)
        {
            std::string kMer = str.substr(i, k);
            assert((int)kMer.length() == k);
            forReturn.at(i) = kMer;
        }
    }
    return forReturn;
}
4

0 回答 0