6

我发现自己经常编写以下形式的 C++ 代码:

while (getline(strm, line)) {
    cout << computationally_intensive_function(line) << endl;
}

我想并行化这段代码。到目前为止,我提出的最佳解决方案是构建字符串向量以容纳大量(10000-100000)行,然后在该向量上并行化

#pragma omp parallel for

然后清空向量并在剩余线条时重复。但是,这种方法需要大量内存,并且在主进程正在缓冲字符串时,其他内核处于空闲状态。有没有更好的办法?像 Pythonmultiprocessing.Pool.map或 Hadoop 之类的东西?(不过,我想避免使用 Hadoop 的 C++ API,因为 Hadoop 相当重量级,可能不会安装在我的代码运行的任何地方。)

4

2 回答 2

5

OpenMP 3.0 tasks存在一个不为人知的功能,这是非常不幸的,因为它们是专门为涵盖此类情况而创建的。如果您的编译器支持该标准版本,那么您绝对应该使用 OpenMP 任务。但请记住,从多个线程写入stdout(或std::cout)通常会严重混合它们的输出,您很可能希望对其进行同步:

#pragma omp parallel
{
    #pragma omp master
    while (getline(strm, line))
    #pragma omp task
    {
        result_type result = computationally_intensive_function(line);
        #pragma omp critical
        {
            cout << result << endl;
            cout.flush();
        }
    }
    #pragma omp taskwait
}

我让你来决定应该是什么变量shared以及应该是什么private

于 2012-05-21T05:49:39.743 回答
1

您应该将计算与从文件中读取行重叠。一种好方法是使用 Threading Building Blocks 管道算法。您所做的是指定三个(基于您在伪代码示例中显示的内容)过滤器,两个串行一个和一个并行。串行过滤器是输入和输出过滤器。第一个从文件中逐行读取数据并将每一行传递给第二个过滤器,该过滤器是并行的,并以多线程模式运行您的计算/处理功能。最后一级/过滤器也是串行的,它确实输出。我正在复制粘贴 TBB 教程中的示例,这似乎正是您想要实现的目标:

// Holds a slice of text.
/** Instances *must* be allocated/freed using methods herein, because the
C++ declaration
represents only the header of a much larger object in memory. */
class TextSlice {
    // Pointer to one past last character in sequence
    char* logical_end;
    // Pointer to one past last available byte in sequence.
    char* physical_end;
public:
    // Allocate a TextSlice object that can hold up to max_size characters.
    static TextSlice* allocate( size_t max_size ) {
        // +1 leaves room for a terminating null character.
        TextSlice* t = (TextSlice*)tbb::tbb_allocator<char>().allocate(sizeof(TextSlice)+max_size+1 );
        t->logical_end = t->begin();
        t->physical_end = t->begin()+max_size;
        return t;
    }
    // Free this TextSlice object
    void free() {
        tbb::tbb_allocator<char>().deallocate((char*)this,
        sizeof(TextSlice)+(physical_end-begin())+1);
    }
    // Pointer to beginning of sequence
    char* begin() {return (char*)(this+1);}
    // Pointer to one past last character in sequence
    char* end() {return logical_end;}
    // Length of sequence
    size_t size() const {return logical_end-(char*)(this+1);}
    // Maximum number of characters that can be appended to sequence
    size_t avail() const {return physical_end-logical_end;}
    // Append sequence [first,last) to this sequence.
    void append( char* first, char* last ) {
        memcpy( logical_end, first, last-first );
        logical_end += last-first;
    }
    // Set end() to given value.
    void set_end( char* p ) {logical_end=p;}
};

让它运行的功能是:

void RunPipeline( int ntoken, FILE* input_file, FILE* output_file ) {
    tbb::parallel_pipeline(
    ntoken,
    tbb::make_filter<void,TextSlice*>(
    tbb::filter::serial_in_order, MyInputFunc(input_file) )
    &
    tbb::make_filter<TextSlice*,TextSlice*>(
    tbb::filter::parallel, MyTransformFunc() )
    &
    tbb::make_filter<TextSlice*,void>(
    tbb::filter::serial_in_order, MyOutputFunc(output_file) ) );
}
于 2012-05-21T10:47:27.243 回答