0

我正在编写一个应用程序,它将来自不同线程的一些数据写入 xml 文件。我尝试使用事件核心对象同步它,但在文件中我得到错误的数据。我得到下一个结果

<file path="somePath" />
<file path="somePath" <file path="somePath" /> />....

但我希望得到

<file path="somePath" />
<file path="somePath" />
<file path="somePath" />

请参阅下面我的伪代码。其中有什么问题?

unsigned int WINAPI MyThread(void *p)
{
    std::wofstream outstr;
    outstr.open("indexingtest.xml", std::ios::app);
    do
    {
        if(somePredicat1)
        {
            WaitForSingleObject(hEvent, INFINITE);
            outstr <<"<file path=\""<< sFileName << "\"\n";
            outstr <<"\tsize=\""<< fileSize << "\" />\n";           
            ReleaseMutex(hMutex);
        }
         if(somePredicat3)
         {
             MyThread(sFileName);
         }
    }while(somePredicat2);
    outstr.close();
    FindClose( hSearch );
    return 0;
}

int _tmain(int argc, TCHAR *argv[])
{
    hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
    //hMutex = CreateMutex(NULL, FALSE, 0);
    unsigned int ThreadID;
    HANDLE hThread1 = (HANDLE)_beginthreadex(NULL, 0, MyThread, L"D:\\*", 0, &ThreadID);
    HANDLE hThread2 = (HANDLE)_beginthreadex(NULL, 0, MyThread, L"C:\\*", 0, &ThreadID);
    SetEvent(hEvent);
    std::wcout << "\a" << std::endl;
    WaitForSingleObject( hThread1, INFINITE );
    return 0;
}

更具体的代码

HANDLE hMutex = CreateMutex(NULL,FALSE, 0);
wchar_t** GetAllFilesImpl( wchar_t const* folder, wchar_t** res, size_t* pAllocated, size_t* pUsed )
{
    HANDLE hSearch;
    WIN32_FIND_DATAW fileinfo;
    size_t allocatedMemory = 0;


    hSearch = FindFirstFileW( folder, &fileinfo );
    if( hSearch != INVALID_HANDLE_VALUE ) {
        do {

            wchar_t* sFileName, ** tmp, sTmp[ 1024 ];
            long fileSize = 0;
            long creationDate;
            /* ignore ., .. */
            if( !wcscmp(fileinfo.cFileName, L".") ||
                !wcscmp(fileinfo.cFileName, L"..") )
                continue;
            sFileName = PathCreator( folder, fileinfo.cFileName );
            fileSize = fileinfo.nFileSizeLow;
            creationDate = fileinfo.ftCreationTime.dwHighDateTime;


            if(fileSize)
            {
                WaitForSingleObject(hMutex, INFINITE);
                std::wofstream outstr;
                            outstr.open("indexingtest.xml", std::ios::app);
                outstr.seekp(std::ios_base::end);
                outstr <<"<file path=\""<< sFileName << "\"\n";
                outstr <<"\tsize=\""<< fileSize << "\" />\n";
                outstr.seekp(std::ios_base::end);
                outstr.close();
                wprintf( L"%s\n", sFileName);
                ReleaseMutex(hMutex);
            }

            tmp = AddToArray( res, pAllocated, pUsed, sFileName );
            if( !tmp ) return FreeAllFilesMemory(res), NULL;
            res = tmp;

            if( fileinfo.dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY ) {
                wcscpy_s( sTmp, sFileName );
                wcscat_s( sTmp, L"\\*" );
                tmp = GetAllFilesImpl( sTmp, res, pAllocated, pUsed );
                if( !tmp ) return NULL;
                res = tmp;
            }
        } while( FindNextFileW(hSearch, &fileinfo) );

        FindClose( hSearch );
    }
    return res;
}

unsigned int WINAPI GetAllFiles( void* folder )
{
    size_t nAllocated = 0, nUsed = 0;
    wchar_t** res = GetAllFilesImpl( (wchar_t *)folder, NULL, &nAllocated, &nUsed );
    if( res ) {
        /* to indicate end of result add a NULL string */
        wchar_t** tmp = AddToArray( res, &nAllocated, &nUsed, NULL );
        if( !tmp ) return FreeAllFilesMemory(res), -1;
        res = tmp;
    }
    std::wcout << "\a" << std::endl;
    return 0;
}
int _tmain(int argc, TCHAR *argv[])
{

    Sleep(1000);
    unsigned int ThreadID;
    HANDLE hThreads[3];
    hThreads[0] = (HANDLE)_beginthreadex(NULL, 0, GetAllFiles, L"D:\\*", 0, &ThreadID);
    hThreads[1] = (HANDLE)_beginthreadex(NULL, 0, GetAllFiles, L"C:\\Users\\Andrew\\Desktop\\*", 0, &ThreadID);
    hThreads[2] = (HANDLE)_beginthreadex(NULL, 0, GetAllFiles, L"E:\\*", 0, &ThreadID);
    unsigned int dw = WaitForMultipleObjects(3, hThreads, TRUE, INFINITE);
    CloseHandle(hFile);
    printf("finished\n");
    return 0;
}
4

2 回答 2

2

您遇到的最大问题是每个线程都单独打开文件。而是在创建线程之前打开文件,然后使用互斥锁同步对文件的写入。

在伪代码中:

std::wofstream output_file;

void my_thread()
{
    do
    {
        if (some_condition)
        {
            lock_mutex();
            do_output();
            unlock_mutex();
        }
    } while (condition);
}

int main()
{
    output_file.open(...);

    create_thread();
    create_thread();

    output_file.close();
}
于 2012-10-18T12:24:59.250 回答
0
  1. 您应该在结束代码之前等待所有线程

    HANDLE aThread[2];
    
    ...
    
    aThread[0] =  (HANDLE)_beginthreadex(...
    aThread[1] =  (HANDLE)_beginthreadex(...
    
    WaitForMultipleObjects(THREADCOUNT, aThread, TRUE, INFINITE);
    
  2. 在进行输出之前,您正在等待事件。完成输出后,您释放互斥锁。这根本不符合逻辑。您应该等待互斥锁和事件。设置事件后,两个线程都将等待释放。因此,互斥锁不做任何事情。当您将事件句柄和互斥量句柄放入一个数组时,您也可以使用 WaitForMultipleObjects 来实现此目的:

    HANDLE hVarious[2];
    hVarious[0] = CreateEvent(NULL, TRUE, FALSE, NULL);
    // Note: this is a manual reset event. 
    // Thus is stays set until explicitly reset
    
    hVarious[1] = CreateMutex(NULL, FALSE, 0);
    
    // and now start the threads:
    aThread[0] =  (HANDLE)_beginthreadex(...
    aThread[1] =  (HANDLE)_beginthreadex(...
    
    // and set the event:
    SetEvent(hEvent);
    
    
    WaitForMultipleObjects(2, aThread, TRUE, INFINITE);
    

    然后线程应如下所示:

    unsigned int WINAPI MyThread(void *p)
    {
      do
      {
        if(somePredicat1)
        {
          // wait for the mutex AND the event
          WaitForMultipleObjects(2, hVarious, TRUE, INFINITE);
    
          // do the file stuff in the mutex protected part       
          std::wofstream outstr;
          outstr.open("indexingtest.xml", std::ios::app);
    
          outstr <<"<file path=\""<< sFileName << "\"\n";
          outstr <<"\tsize=\""<< fileSize << "\" />\n";           
    
          outstr.close();
          FindClose( hSearch );
          ReleaseMutex(hVarious[1]);
        }
      }while(somePredicat2);
      return 0;
    }
    

请记住:互斥锁的建立是为了保护并发应用程序中的资源。

我不知道somePredicat1somePredicat1。这些参数在不同线程中使用时也可能会出现问题。但是,您观察到的错误输出是由错误的互斥锁使用引起的。

评论后编辑:

    if(somePredicat3)
    {
         MyThread(sFileName);
    }

一个。线程本身作为函数调用而不关闭文件。

湾。您应该提供有关用途的更多详细信息somePredicat3, somePredicat2, and somePredicat1

C。您必须使用某种排他性来保护输出文件,因为它被多个线程使用。您也可以使用临界区对象来执行此操作。

于 2012-10-18T12:39:40.143 回答