1

我的应用程序中有 4 个线程。一个是主线程,另外三个是工作线程。我希望这 3 个工作线程中的前 2 个生成数据,第三个在生成数据时将其写入。数据生成器线程应该同步以使其并行运行(同时开始“for”循环的每次迭代)。如果 CPU 足够快,写入器线程应该一直在写入。我不知道如何在 C++ 中专业地同步所有这 3 个线程,所以我编写了代码,就像有 ' __syncthreads()' 函数来代表我的意思最好的方式。__syncthreads()传统 C++ 中是否有相当于 CUDA C ' ' 的东西?如果不是,那么如何以我想要的方式最佳地实现同步?(我不喜欢while代码中的那些循环。

volatile bool write_flag ;

int main(int argc, char **argv)
{
    ...
    write_flag = false ; // nothing to write at the beginning
    ...
    HANDLE *trdHandles = new HANDLE[WORKING_THREADS] ;
    int IDs[] = {0, 1} ; // IDs for generator threads

    trdHandles[0] = CreateThread(NULL, 0, generator, &IDs[0], 0, NULL) ;  // 1st data generator thread
    if(trdHandles[0] == NULL)
    ExitProcess(0) ;
    trdHandles[1] = CreateThread(NULL, 0, generator, &IDs[1], 0, NULL) ;  // 2nd data generator thread
    if(trdHandles[1] == NULL)
    ExitProcess(0) ;

    trdHandles[2] = CreateThread(NULL, 0, writer, f_out, 0, NULL) ;  // writer thread
    if(trdHandles[2] == NULL)
    ExitProcess(0) ;
    ...
}

WINAPI DWORD generator(LPVOID lpParam)
{
    int *ID = static_cast<int*>(lpParam) ;
    dataGen(*ID) ;
    return 0 ;
}

void dataGen(int id)
{
    ...
    for(int aa = 0; aa < cycles; aa++)
    {
        __syncthreads() ;

        ... // both threads generate data here in parallel

        while(write_flag) // don't generate data too fast. Wait for writes to complete (this flag is initially set to 'false')
        ;
        setBuffers(id, aa) ; // for swapping in/out buffers
        if(id == 0) // only one thread needs to set the flag
        write_flag = true ;
     }
}

WINAPI DWORD writer(LPVOID lpParam)
{
    ofstream *f_out = static_cast<ofstream*>(lpParam) ;
    while(1)
    {
        if(write_flag)
        {
            f_out->write(out_buffer0, chunk_len) ;
            f_out->write(out_buffer1, chunk_len) ;
            write_flag = false ;
            if(!finish)
            continue ;
            else
            return 0 ;
        }
    }
}
4

3 回答 3

3

查找The Little Book Of Semaphores 3.5 节中描述的屏障模式的实现。

正如您所描述的,屏障模式用于同步线程。

于 2013-10-26T16:09:44.150 回答
0

“The Little Book Of Semaphores”这本书还不错,但是它主要集中在编程方面,而不是像我预期的那样只关注 C++。但是这本书帮助了我,因为我发现详细的 C++ 障碍模式解释比没有它更快。读完后:

http://adilevin.wordpress.com/2009/06/04/locking-mechanisms/

和这个:

http://adilevin.wordpress.com/category/multithreading/(屏障主要功能部分)

我只需要花一点时间来解决我的问题。我通过使用一个bool标志、Semaphore对象和主要WaitForSingleObject()调用的某种组合来解决它,如下面的代码所示。我确信它可以工作,因为在运行时没有断言错误。它是类似于我的应用程序代码的完整代码,但这仅代表我如何解决问题。如果您对此代码有任何建议 - 如果可以以更好的方式实现,请回答。

#include <iostream>
#include <conio.h>
#include <stdio.h>
#include <windows.h>
#include <sstream>
#include <cassert>

#define THREADS_NUM 3

WINAPI DWORD generator(LPVOID lpParam) ;
WINAPI DWORD writer(LPVOID lpParam) ;
void dataGen(int id) ;
void locker() ;
void sync_msg_display(std::string msg) ;

volatile bool write_flag = false, finish = false ;
volatile long entered_num ;
int time0 = 950, time1 = 1050, time2 = 550 ;

HANDLE sem0, sem1, sem2 ;

using namespace std ;

int main(void)
{
    HANDLE trdHandles[THREADS_NUM] ;
    int IDs[THREADS_NUM] ;

    for(int aa = 0; aa < THREADS_NUM; aa++)
    IDs[aa] = aa ;

    entered_num = 0 ;
    sem0 = CreateSemaphore(NULL, 0, 4096, NULL) ;

    for(int aa = 0; aa < THREADS_NUM - 1; aa++)
    trdHandles[aa] = CreateThread(NULL, 0, generator, &IDs[aa], 0, NULL) ;
    trdHandles[THREADS_NUM - 1] = CreateThread(NULL, 0, writer, &IDs[THREADS_NUM - 1], 0, NULL) ;

    for(int aa = 0; aa < THREADS_NUM; aa++)
    if(trdHandles[aa] == NULL)
    ExitProcess(0) ;

    WaitForMultipleObjects(THREADS_NUM, trdHandles, true, INFINITE) ;
    for(int aa = 0; aa < THREADS_NUM; aa++)
    CloseHandle(trdHandles[aa]) ;

    CloseHandle(sem0) ;
    CloseHandle(sem1) ;
    CloseHandle(sem2) ;

    cout << "finished !" << endl ; 

    getch() ;
    return 0 ;
}

WINAPI DWORD generator(LPVOID lpParam)
{
int id = *(static_cast<int*>(lpParam)) ;
dataGen(id) ;
return 0 ;
}

WINAPI DWORD writer(LPVOID lpParam)
{
    LONG prev ;
    sem1 = CreateSemaphore(NULL, 0, 4096, NULL) ;
    sem2 = CreateSemaphore(NULL, 0, 4096, NULL) ;
    while(1)
    {
        WaitForSingleObject(sem1, INFINITE) ;
        write_flag = true ;

        sync_msg_display("Write started.") ;
        Sleep(time2) ;
        sync_msg_display("Write finished.") ;

        write_flag = false ;
        ReleaseSemaphore(sem2, 2, &prev) ;
        if(finish)
        return 0 ;
    }
}

void dataGen(int id)
{
    LONG prev ;
    stringstream ss ;
    for(int aa = 0; aa < 20; aa++)
    {
        if(id == 0)
        {
            ss << aa ;
            sync_msg_display("Generator thread no. 0 iteration no. " + ss.str() + " start.") ;
            ss.str("") ;
            if(aa % 2)
            Sleep(time0) ;
            else
            Sleep(time1) ;
            ss << aa ;
            sync_msg_display("Generator thread no. 0 iteration no. " + ss.str() + " complete.") ;
            ss.str("") ;
        }
        else
        {
            ss << aa ;
            sync_msg_display("Generator thread no. 1 iteration no. " + ss.str() + " start.") ;
            ss.str("") ;
            if(aa % 2)
            Sleep(time1) ;
            else
            Sleep(time0) ;
            ss << aa ;
            sync_msg_display("Generator thread no. 1 iteration no. " + ss.str() + " complete.") ;
            ss.str("") ;
        }

        if(write_flag) // don't generate data too fast. Wait for writes to complete (this flag is initially set to 'false')
        WaitForSingleObject(sem2, INFINITE) ;

        assert(!write_flag) ;
        Sleep(10) ; ////
        assert(!write_flag) ;

        locker() ;

        if(id == 0) // only one thread needs to set the flag
        ReleaseSemaphore(sem1, 1, &prev) ;
    }
    locker() ;
    if(id == 0)
    finish = true ;
}

void locker()
{
    LONG prev ;
    if(InterlockedIncrement(&entered_num) < 2)
    WaitForSingleObject(sem0, INFINITE) ;
    else
    {
        entered_num = 0 ;
        ReleaseSemaphore(sem0, 1, &prev) ;
    }
}

void sync_msg_display(string msg)
{
    HANDLE lock = CreateMutex(NULL, FALSE, "mutex") ;
    WaitForSingleObject(lock, INFINITE) ;
    cout << msg << endl ;
    ReleaseMutex(lock) ;
    CloseHandle(lock) ;
}
于 2013-10-27T10:29:46.080 回答
0

C++ 不直接支持多线程(直到 C++11)。您必须使用操作系统服务来实现多线程和同步。在 Windows 上,有一组丰富的同步功能。对于您的方案,请查看Wait FunctionsEvent FunctionsSetEvent两者的结合WaitForMultipleObjects将是一个可行的解决方案。

于 2013-10-27T01:17:16.107 回答