1

我正在编写一个使用 WinAPI 的关键部分作为同步机制的多线程控制台应用程序。我需要创建 5 个线程,每个线程都有自己的字符串由线程显示。线程应该按顺序输出它们的字符串,一个接一个。

所以,我有线程功能:

void thread_routine(void* data)
{
    std::string* st = (std::string*)data;
    while(1)
    {
        /* enter critical section */
        std::cout << st; // not exactly that way, but I hope that you understood what I've meant.
        Sleep(100);
        /* leave critical section */
    }
}

而在我的main()

for(int i = 0; i < 10; ++i)
    _beginthread(thread_routine, 0, strings[i]);

好吧,我预计会看到类似[o1]的内容:

1) First thread
2) Second thread
3) Third thread
4) Fourth thread
5) Fifth thread
... and so on

但我看到的不是这个输出,而是[o2]

1) First thread
1) First thread
3) Third thread
2) Second thread
5) Fifth thread
... and so on

我清楚地理解会发生什么:线程通过关键部分的顺序是未知的,因此该部分将被随机捕获,但我需要同步我的线程以获得类似[o1]的输出。那我该怎么办?有没有图案?

4

6 回答 6

3

Critical sections are not the way to solve this problem. A critical section is nothing more than a mutual exclusion device. It's intended to ensure that only one thread can be executing a particular piece of code at a given time. It's not intended to be used for sequencing, nor is it particularly good for that task. The problem is that you can't wait on a critical section without acquiring it.

Consider a case in which Thread B has to wait for Thread A to finish before continuing with its work. If you use a critical section (call it cs1), then you have to ensure that Thread A acquires it before Thread B tries to acquire it. That means you have to make 100% sure that Thread A begins execution and acquires the critical section before Thread B can potentially acquire the critical section.

Event Objects, on the other hand, allow you to wait for an event or continue if the event has already occurred. So Thread B can wait on the event before Thread A even starts. And it will wait until Thread A (or somebody) sets the event. The problem you describe (thread B needing to wait for some event to happen) is exactly the type of thing that event objects were designed to solve.

Given 3 threads that process in parallel for some time, but at some point need to wait for some other event to happen, you'd write code to create and wait on the events. Briefly:

HANDLE event1_done = CreateEvent(...);
HANDLE event2_done = CreateEvent(...);
HANDLE event3_done = CreateEvent(...);

All events are manual reset, and their initial state is not signaled.

The main thread starts the three events. Then it waits on the third one to be finished:

WaitForSingleObject(event3_done, INFINITE);

The individual threads do their processing and wait on their respective events. Thread 1, of course, doesn't wait. It just does this:

// thread 1
// do processing
// then signal that it's finished
SetEvent(event1_done);

Thread 2 does its processing, waits on event1_done, and then sets event2_done:

// thread 2
// do processing
// wait for the first thread to complete
WaitForSingleObject(event1_done, INFINITE);
// do whatever
// and then signal that it's done
SetEvent(event2_done);

Thread 3 is just like Thread 2; only the event names have changed.

The difference between this method and using critical sections is that multiple threads could be waiting on the same event object. When that event is set, all of the threads waiting on that object are released. If you want only one of them released, then you'd use an auto reset event.

Also note that if you want to re-use those events, you'll need to reset them (call ResetEvent). When you do that is up to you.

于 2013-10-31T21:07:32.747 回答
2

您需要编写自己的调度程序。只是另一个以指定顺序唤醒您的线程的线程。在这种情况下,您必须将更复杂的数据传递给您的线程,包括一些可等待对象(即信号量)。我对 WinAPI 没有经验,这只是一个想法:

void scheduler_thread(void* data) {
  scheduler_data* threads = (scheduler_data*)data;
  int index = 0;
  while (true) {
    notify_waitable_object(threads->waitable_object[index]);
    sleep(timeout);
    index = (index + 1) % threads->thread_count;
  }
}

void thread_procedure(void* data) {
  some_thread_data* p = (some_thread_data*)data;
  while(true)
  {
    wait_for_notify(p->wait_object);
    std::cout << p->st;
  }  
}
于 2013-10-31T20:52:28.867 回答
1

你可能有一个设计问题,应该解决这个问题!

但是,如果您真的要同步线程,这是一种方法。可能会因为这非常低效并且如果任何线程跳过重要部分(例如通过try-catch)会死锁,但仍然是一种方法:

#include <thread>
#include <atomic>
#include <cstdlib>
#include <iostream>
#include <vector>

void myFunc(int idx,std::atomic<int> * counter){

    std::this_thread::sleep_for(std::chrono::milliseconds(std::rand()%100));
    // Don't know about memory_order stuff
    // If you want to use this approach, you should read about it.
    while(counter->load()!=idx){
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }
    std::cout << idx << " thread here\n";
    counter->store(idx+1);
}

int main(){
    std::srand(std::time(0));
    std::atomic<int> counter(0);
    std::vector<std::thread> thread_vector;
    for(int i=0;i<10;i++)
        thread_vector.push_back(std::thread(myFunc,i,&counter));

    for(int i=0;i<10;i++)
        thread_vector[i].join();
}
于 2013-10-31T20:40:53.753 回答
1

1)您不应该将 I/O 放在单独的线程中。main在线程完成处理后在线程中执行输出。

2)如果您需要线程顺序执行,则不需要线程。线程对于并行操作很有用。如果您需要按顺序执行某些操作,只需在单个线程(您的main线程)中执行即可。

3)如果你真的想这样做,你会想要创建一系列关键部分,每个线程都在等待不同的线程。那是,

  • 线程 2 正在等待线程 1 清除关键部分 1
  • 线程 3 正在等待线程 2 清除关键部分 2
  • 线程 4 正在等待线程 3 清除关键部分 3 等。

此外,您的data对象(传递到每个线程)不仅需要包含std::string您要打印的对象,还需要包含它必须等待和清除的关键部分的句柄(也就是说,您需要一个结构至少有 2 个临界区句柄和一个字符串)。演示该想法的不完整示例如下:

struct ThreadData
{
    std::string st;
    CRITICAL_SECTION* prevCS;
    CRITICAL_SECTION* nextCS;
};

class Mutex
{
public:
    Mutex(CRITICAL_SECTION* cs) : _cs(cs)
    {
        EnterCriticalSection(_cs);
    }

    ~Mutex()
    {
        LeaveCriticalSection(_cs);
    }
private:
    CRITICAL_SECTION* _cs;
};

void thread_routine(void* data)
{
    ThreadData* d = (ThreadData*)data;
    std::unique_ptr<Mutex> current; // this is the one my predecessor must clear
    std::unique_ptr<Mutex> next; // this is the one I have to clear
    if (d->nextCS)
    {
        next = std::make_unique<Mutex>(d->nextCS);
    }

    if (d->prevCS)
    {
        current = std::make_unique<Mutex>(d->prevCS);
    }

    std::cout << d->st << std::endl;
}

int main()
{
    CRITICAL_SECTION cs1;
    CRITICAL_SECTION cs2;

    InitializeCriticalSection(&cs1);  
    InitializeCriticalSection(&cs2);

    ThreadData td1;
    ThreadData td2;
    ThreadData td3;

    td1.nextCS = &cs1;
    td1.prevCS = nullptr;
    td1.st = "Thread 1";

    td2.nextCS = &cs2;
    td2.prevCS = &cs1;
    td2.st = "Thread 2";

    td3.nextCS = nullptr;
    td3.prevCS = &cs2;
    td3.st = "Thread 3";     

    _beginthread(thread_routine, 0, &td1);
    _beginthread(thread_routine, 0, &td2);
    _beginthread(thread_routine, 0, &td3);

   // NOTE:  you also need to add an event handle to wait for in the main thread.  It would go here   

    return 0;
}

替代解决方案

(可能会也可能不会满足您的任务需要):

您一次只能运行 1 个线程,并让您的主线程等待一个线程完成,然后再让下一个线程开始。也就是说,您创建了所有线程,然后将其置于挂起状态。然后 main 启动您的第一个线程并等待它完成,然后再启动第二个线程,并重复您拥有的线程数。根据要求的编写方式,这可能是避免教授作业愚蠢的一种聪明方法。

于 2013-10-31T20:25:43.340 回答
0

我创建了一个简单的生产者、消费者场景,其中我有一个函数“生产者”生产数据和 N 个(N 是可配置的)消费者,按顺序(严格顺序)消费数据,即第一个线程应该在第二个线程之前消费数据。和第 3 个线程之前的第 2 个线程,依此类推。当每个消费者消费数据时,它将再次从第一个线程开始。

为了按顺序运行它,我使用条件变量和“notify_one()”函数。

#include <iostream>
#include <queue>
#include <vector>
#include <thread>
#include <mutex>
#include <condition_variable>

#define NUMBER_OF_THREADS 5
#define MAX_QUEUE_ELEM 500

std::mutex m;
std::condition_variable producerMutex;
std::condition_variable consumerMutex[NUMBER_OF_THREADS];
std::queue<int> Q;
static int elemCounter = 1;
static int threadCounter = 1;

void producer()
{
    while (true)
    {
        // lock thread
        std::unique_lock<std::mutex> lock(m);

        while (Q.size() == MAX_QUEUE_ELEM)
        {
            std::cout << "Producer waiting" << std::endl;
            producerMutex.wait(lock);
        }

        Q.push(elemCounter++);

        // unlock thread
        lock.unlock();

        // notify next waiting consumer thread
        consumerMutex[threadCounter].notify_one();
    }
}

void consumer()
{
    while (true)
    {
        //lock thread
        std::unique_lock<std::mutex> lock(m);

        while (Q.empty())
        {
            std::cout << "Consumer waiting : %d "<< threadCounter << std::endl;
            consumerMutex[threadCounter].wait(lock);
        }

        int val = Q.front();
        Q.pop();

        // Printing in lock to print in sequnce
        std::cout << "val: %d " << val << " , thread number: %d " << threadCounter << std::endl;

        // unloack thread
        lock.unlock();

        // Notify to next waiting thread in sequnce
        if (threadCounter == NUMBER_OF_THREADS)
        {
            // it means this is last thread so notify first thread
            threadCounter = 1;
            consumerMutex[threadCounter].notify_one();
        }
        else
        {
            consumerMutex[++threadCounter].notify_one();
        }
    }
}

int main()
{
    std::thread thrds[NUMBER_OF_THREADS];

    for (int i = 0; i < NUMBER_OF_THREADS; i++)
    {
        thrds[i] = std::thread(consumer);
    }

    producer();

    for (int i = 0; i < NUMBER_OF_THREADS; i++)
    {
        thrds[i].join();
    }

    return 0;
}
于 2017-03-26T14:21:08.920 回答
-1
/** For Scheduling the thread execution order, Evens are enough, no synchronization objects like Mutex, Critical Section etc.. are not needed. Below is a code and which is scalable based on the number of threads entered. */

//Code for Schedule threads execution in a order T1->T2->T3-> ..->Tn->T1..
// where T1 is Thread-1, T2 is Thread-2 .. Tn is Thread-N

#include <iostream>
#include <Windows.h>

HANDLE  * EventsHandlesArray;    
HANDLE * ThreadHandlesArray;

int iMaxWaitTime = 10000; // 10 Secs    
int iThreadWaitTimeInterval = 1000; //1 sec

int iNumberOfThread = 3; // You can change this no. of thread value within the permitted limit by your system :)

DWORD WINAPI ThreadFun( LPVOID lpParam )

{
    int iThreadIndex = reinterpret_cast<int> (lpParam);
    int iCurrentEVentIndex = iThreadIndex == 1 ? iNumberOfThread-1: iThreadIndex-2;
        int iNewEVentIndex = iThreadIndex -1;

    while(1)
    {
        switch(::WaitForSingleObject(EventsHandlesArray[iCurrentEVentIndex],iMaxWaitTime))
        {
            case WAIT_OBJECT_0:
                    std::cout<<" I am a thread "<<iThreadIndex<<std::endl;
                    ResetEvent ( EventsHandlesArray[iCurrentEVentIndex]);
                    Sleep (iThreadWaitTimeInterval);
                    SetEvent ( EventsHandlesArray[iNewEVentIndex]);
                break;

            case WAIT_FAILED:
                    std::cout<<"Error in thread"<<iThreadIndex<<" Thread wait time failed "<<std::endl;
                   ResetEvent ( EventsHandlesArray[iCurrentEVentIndex]);
                   SetEvent ( EventsHandlesArray[iCurrentEVentIndex]);
                break;

            case WAIT_TIMEOUT:
                    std::cout<<"Error in thread"<<iThreadIndex<<" Thread wait timed out"<<std::endl;
                    ResetEvent ( EventsHandlesArray[iCurrentEVentIndex]);
                   SetEvent ( EventsHandlesArray[iCurrentEVentIndex]);
                break;
        }
    }
}

int main(void)

{       
__try
    {
       std::cout<<"Running Main, Creating Events"<<std::endl;
       EventsHandlesArray = new HANDLE[iNumberOfThread];
       ThreadHandlesArray = new HANDLE[iNumberOfThread];

        for(int iCount = 0; iCount<iNumberOfThread; iCount++)
        {
            EventsHandlesArray[iCount] = CreateEvent ( NULL , true , false , NULL );
        }

        for(int iCount = 0; iCount<iNumberOfThread; iCount++)
        {
            if( EventsHandlesArray[iCount] == INVALID_HANDLE_VALUE)
            {
                std::cout<<"Problem with Event Creation"<<std::endl;
                for(int iCount =0;iCount<iNumberOfThread;iCount++)
                {
                    CloseHandle(EventsHandlesArray[iCount]); 
                }
                return 0;
            }
        }

        DWORD Id=0;
        int iThreadIndex = 1;

        for(int iCount = 0; iCount<iNumberOfThread; iCount++)
        {
            iThreadIndex = iCount+1;
            ThreadHandlesArray[iCount] = CreateThread ( NULL, 0, (LPTHREAD_START_ROUTINE)ThreadFun,(LPVOID)iThreadIndex ,CREATE_SUSPENDED,&Id );
            std::cout<<"Thread Created : "<<Id<<std::endl;
        }

        bool bThreadCreatedSuccessfully = true;

        for(int iCount = 0; iCount<iNumberOfThread; iCount++)
        {
            if( ThreadHandlesArray[iCount] == INVALID_HANDLE_VALUE || ThreadHandlesArray[iCount] == NULL)
            {
                bThreadCreatedSuccessfully = false;
                break;
            }
        }

        if(bThreadCreatedSuccessfully)
        {
            std::cout<<"Resuming Threads "<<std::endl;
            for(int iCount =0;iCount<iNumberOfThread;iCount++)
            {
                //Try to close the event handles
                ResumeThread(ThreadHandlesArray[iCount]);
            }

            Sleep (iThreadWaitTimeInterval);
            SetEvent ( EventsHandlesArray[iNumberOfThread-1]);
            WaitForMultipleObjects(iNumberOfThread,ThreadHandlesArray, TRUE, INFINITE);
        }
        else
        {
            std::cout<<"Issue with Thread Creation"<<std::endl;
        }
    }

    __finally 
    {
        //Close Threads & Events
        for(int iCount=0;iCount<iNumberOfThread;iCount++)
        {
            //Try to close the event handles
            CloseHandle(ThreadHandlesArray[iCount]);
            CloseHandle(EventsHandlesArray[iCount]);
        }
        std::cout<<"  .... Exiting the program"<<std::endl;
    }
    return 0;
}
于 2018-04-03T12:33:45.290 回答