2

(简而言之:main() 的 WaitForSingleObject 挂在下面的程序中)。

我正在尝试编写一段代码来分派线程并等待它们在恢复之前完成。我没有每次都创建线程,这很昂贵,而是让它们进入睡眠状态。主线程在 CREATE_SUSPENDED 状态下创建 X 个线程。

同步是使用以 X 作为最大计数的信号量完成的。信号量的计数器被归零,线程被分派。线程执行一些愚蠢的循环并在它们进入睡眠之前调用 ReleaseSemaphore。然后主线程使用WaitForSingleObject X 次来确保每个线程都完成了它的工作并且正在休眠。然后它循环并再次执行所有操作。

有时程序不会退出。当我对程序进行喙时,我可以看到 WaitForSingleObject 挂起。这意味着线程的 ReleaseSemaphore 不起作用。没有任何东西是 printf'ed 所以应该没有出错。

也许两个线程不应该同时调用 ReleaseSemaphore,但这会使信号量的目的无效......

我只是不明白...

感谢您接受其他同步线程的解决方案!

#define TRY  100
#define LOOP 100

HANDLE *ids;
HANDLE semaphore;

DWORD WINAPI Count(__in LPVOID lpParameter)
{ 
 float x = 1.0f;   
 while(1)
 { 
  for (int i=1 ; i<LOOP ; i++)
   x = sqrt((float)i*x);
  while (ReleaseSemaphore(semaphore,1,NULL) == FALSE)
   printf(" ReleaseSemaphore error : %d ", GetLastError());
  SuspendThread(ids[(int) lpParameter]);
 }
 return (DWORD)(int)x;
}

int main()
{
 SYSTEM_INFO sysinfo;
 GetSystemInfo( &sysinfo );
 int numCPU = sysinfo.dwNumberOfProcessors;

 semaphore = CreateSemaphore(NULL, numCPU, numCPU, NULL);
 ids = new HANDLE[numCPU];

 for (int j=0 ; j<numCPU ; j++)
  ids[j] = CreateThread(NULL, 0, Count, (LPVOID)j, CREATE_SUSPENDED, NULL);

 for (int j=0 ; j<TRY ; j++)
 {
  for (int i=0 ; i<numCPU ; i++)
  {
   if (WaitForSingleObject(semaphore,1) == WAIT_TIMEOUT)
    printf("Timed out !!!\n");
   ResumeThread(ids[i]);  
  }
  for (int i=0 ; i<numCPU ; i++)
   WaitForSingleObject(semaphore,INFINITE);
  ReleaseSemaphore(semaphore,numCPU,NULL);
 }
 CloseHandle(semaphore);
 printf("Done\n");
 getc(stdin);
}
4

5 回答 5

5

我一直使用线程安全队列,而不是使用信号量(至少直接)或让 main 显式唤醒线程以完成某些工作。当 main 想要一个工作线程做某事时,它会将要完成的作业的描述推送到队列中。每个工作线程只做一个工作,然后尝试从队列中弹出另一个工作,并最终挂起,直到队列中有一个工作可供他们做:

队列的代码如下所示:

#ifndef QUEUE_H_INCLUDED
#define QUEUE_H_INCLUDED

#include <windows.h>

template<class T, unsigned max = 256>
class queue { 
    HANDLE space_avail; // at least one slot empty
    HANDLE data_avail;  // at least one slot full
    CRITICAL_SECTION mutex; // protect buffer, in_pos, out_pos

    T buffer[max];
    long in_pos, out_pos;
public:
    queue() : in_pos(0), out_pos(0) { 
        space_avail = CreateSemaphore(NULL, max, max, NULL);
        data_avail = CreateSemaphore(NULL, 0, max, NULL);
        InitializeCriticalSection(&mutex);
    }

    void push(T data) { 
        WaitForSingleObject(space_avail, INFINITE);       
        EnterCriticalSection(&mutex);
        buffer[in_pos] = data;
        in_pos = (in_pos + 1) % max;
        LeaveCriticalSection(&mutex);
        ReleaseSemaphore(data_avail, 1, NULL);
    }

    T pop() { 
        WaitForSingleObject(data_avail,INFINITE);
        EnterCriticalSection(&mutex);
        T retval = buffer[out_pos];
        out_pos = (out_pos + 1) % max;
        LeaveCriticalSection(&mutex);
        ReleaseSemaphore(space_avail, 1, NULL);
        return retval;
    }

    ~queue() { 
        DeleteCriticalSection(&mutex);
        CloseHandle(data_avail);
        CloseHandle(space_avail);
    }
};

#endif

并且在线程中使用它的代码的粗略等价物看起来像这样。我没有弄清楚你的线程函数在做什么,但它是平方根求和的东西,显然你对线程同步比线程实际做的更感兴趣,目前。

编辑:(基于评论):如果您需要main()等待某些任务完成,做更多的工作,然后分配更多的任务,通常最好通过在每个任务中放置一个事件(例如)来处理它,并让您的线程函数设置事件。修改后的代码看起来像这样(注意队列代码不受影响):

#include "queue.hpp"

#include <iostream>
#include <process.h>
#include <math.h>
#include <vector>

struct task { 
    int val;
    HANDLE e;

    task() : e(CreateEvent(NULL, 0, 0, NULL)) { }
    task(int i) : val(i), e(CreateEvent(NULL, 0, 0, NULL)) {}
};

void process(void *p) { 
    queue<task> &q = *static_cast<queue<task> *>(p);

    task t;
    while ( -1 != (t=q.pop()).val) {
        std::cout << t.val << "\n";
        SetEvent(t.e);
    }
}

int main() { 
    queue<task> jobs;

    enum { thread_count = 4 };
    enum { task_count = 10 };

    std::vector<HANDLE> threads;
    std::vector<HANDLE> events;

    std::cout << "Creating thread pool" << std::endl;
    for (int t=0; t<thread_count; ++t)
        threads.push_back((HANDLE)_beginthread(process, 0, &jobs));
    std::cout << "Thread pool Waiting" << std::endl;

    std::cout << "First round of tasks" << std::endl;

    for (int i=0; i<task_count; ++i) {
        task t(i+1);
        events.push_back(t.e);
        jobs.push(t);
    }

    WaitForMultipleObjects(events.size(), &events[0], TRUE, INFINITE);

    events.clear();

    std::cout << "Second round of tasks" << std::endl;

    for (int i=0; i<task_count; ++i) {
        task t(i+20);
        events.push_back(t.e);
        jobs.push(t);
    }

    WaitForMultipleObjects(events.size(), &events[0], true, INFINITE);
    events.clear();

    for (int j=0; j<thread_count; ++j)
        jobs.push(-1);

    WaitForMultipleObjects(threads.size(), &threads[0], TRUE, INFINITE);

    return 0;
}
于 2010-03-03T22:07:07.083 回答
3

我不明白代码,但线程同步肯定很糟糕。您假设线程将按特定顺序调用 SuspendThread()。成功的 WaitForSingleObject() 调用不会告诉您哪个线程调用了 ReleaseSemaphore()。因此,您将在未挂起的线程上调用 ReleaseThread()。这很快就会使程序陷入僵局。

另一个不好的假设是,在 WFSO 返回后,线程已经调用了 SuspendThread。通常是的,但并非总是如此。线程可以在 RS 调用之后立即被抢占。您将再次在未挂起的线程上调用 ReleaseThread()。这通常需要一天左右的时间才能使您的程序陷入僵局。

而且我认为有一个 ReleaseSemaphore 调用太多了。毫无疑问,试图解开它。

你不能用 Suspend/ReleaseThread() 控制线程,不要尝试。

于 2010-03-03T21:59:57.433 回答
3

问题发生在以下情况:

主线程恢复工作线程:

  for (int i=0 ; i<numCPU ; i++)
  {
   if (WaitForSingleObject(semaphore,1) == WAIT_TIMEOUT)
    printf("Timed out !!!\n");
   ResumeThread(ids[i]);  
  }

工作线程完成他们的工作并释放信号量:

  for (int i=1 ; i<LOOP ; i++)
   x = sqrt((float)i*x);
  while (ReleaseSemaphore(semaphore,1,NULL) == FALSE)

主线程等待所有工作线程并重置信号量:

  for (int i=0 ; i<numCPU ; i++)
   WaitForSingleObject(semaphore,INFINITE);
  ReleaseSemaphore(semaphore,numCPU,NULL);

主线程进入下一轮,试图恢复工作线程(请注意,工作线程尚未自行暂停事件!这是问题开始的地方......您正在尝试恢复不一定暂停的线程然而):

  for (int i=0 ; i<numCPU ; i++)
  {
   if (WaitForSingleObject(semaphore,1) == WAIT_TIMEOUT)
    printf("Timed out !!!\n");
   ResumeThread(ids[i]);  
  }

最后,工作线程暂停自己(尽管他们应该已经开始下一轮):

  SuspendThread(ids[(int) lpParameter]);

并且主线程永远等待,因为现在所有工作人员都被挂起:

  for (int i=0 ; i<numCPU ; i++)
   WaitForSingleObject(semaphore,INFINITE);

这是一个链接,显示如何正确解决生产者/消费者问题:

http://en.wikipedia.org/wiki/Producer-consumer_problem

我也认为关键部分比信号量和互斥锁快得多。在大多数情况下(imo),它们也更容易理解。

于 2010-03-03T22:23:28.980 回答
0

问题是你等待的频率比你发出信号的频率高。

for (int j=0 ; j<TRY ; j++)循环等待信号量八次,而四个线程每个只会发出一次信号,而循环本身会发出一次信号。第一次通过循环时,这不是问题,因为信号量的初始计数为 4。第二次和以后的每一次,你都在等待太多的信号。这可以通过在前四个等待中限制时间并且不重试错误这一事实来缓解。所以有时它可能会起作用,有时你的等待会挂起。

我认为以下(未经测试的)更改会有所帮助。

将信号量初始化为零计数:

semaphore = CreateSemaphore(NULL, 0, numCPU, NULL);

摆脱线程恢复循环中的等待(即删除以下内容):

   if (WaitForSingleObject(semaphore,1) == WAIT_TIMEOUT)  
      printf("Timed out !!!\n");  

从 try 循环的末尾删除无关的信号(即删除以下内容):

ReleaseSemaphore(semaphore,numCPU,NULL);
于 2010-03-03T22:30:24.427 回答
0

这是一个实用的解决方案。

我希望我的主程序使用线程(然后使用多个内核)来处理作业并等待所有线程完成,然后再恢复并执行其他操作。我不想让线程死亡并创建新线程,因为这很慢。在我的问题中,我试图通过暂停线程来做到这一点,这似乎很自然。但正如 nobugz 指出的那样,“你可以使用 Suspend/ReleaseThread() 控制线程”。

该解决方案涉及信号量,例如我用来控制线程的信号量。实际上,还有一个信号量用于控制主线程。现在我每个线程有一个信号量来控制线程和一个信号量来控制主线程。

这是解决方案:

#include <windows.h>
#include <stdio.h>
#include <math.h>
#include <process.h>

#define TRY  500000
#define LOOP 100

HANDLE *ids;
HANDLE *semaphores;
HANDLE allThreadsSemaphore;

DWORD WINAPI Count(__in LPVOID lpParameter)
{   
    float x = 1.0f;         
    while(1)
    {   
        WaitForSingleObject(semaphores[(int)lpParameter],INFINITE);
        for (int i=1 ; i<LOOP ; i++)
            x = sqrt((float)i*x+rand());
        ReleaseSemaphore(allThreadsSemaphore,1,NULL);
    }
    return (DWORD)(int)x;
}

int main()
{
    SYSTEM_INFO sysinfo;
    GetSystemInfo( &sysinfo );
    int numCPU = sysinfo.dwNumberOfProcessors;

    ids = new HANDLE[numCPU];
    semaphores = new HANDLE[numCPU]; 

    for (int j=0 ; j<numCPU ; j++)
    {
        ids[j] = CreateThread(NULL, 0, Count, (LPVOID)j, NULL, NULL);
        // Threads blocked until main releases them one by one
        semaphores[j] = CreateSemaphore(NULL, 0, 1, NULL);
    }
    // Blocks main until threads finish
    allThreadsSemaphore = CreateSemaphore(NULL, 0, numCPU, NULL);

    for (int j=0 ; j<TRY ; j++)
    {
        for (int i=0 ; i<numCPU ; i++) // Let numCPU threads do their jobs
            ReleaseSemaphore(semaphores[i],1,NULL);
        for (int i=0 ; i<numCPU ; i++) // wait for numCPU threads to finish
            WaitForSingleObject(allThreadsSemaphore,INFINITE);
    }
    for (int j=0 ; j<numCPU ; j++)
        CloseHandle(semaphores[j]);
    CloseHandle(allThreadsSemaphore);
    printf("Done\n");
    getc(stdin);
}
于 2010-03-04T14:23:19.587 回答