2

我的应用程序由主进程和两个线程组成,所有线程都同时运行并使用三个 fifo 队列:

fifo-q 是 Qmain、Q1 和 Q2。在内部,每个队列都使用一个计数器,该计数器在将项目放入队列时递增,并在从队列中“获取”项目时递减。

处理涉及两个线程,
QMaster,从Q1和Q2获取,放入Qmain,
Monitor,放入Q2,
主进程,从Qmain获取,放入Q1。

QMaster-thread 循环连续检查 Q1 和 Q2 的计数,如果有任何项目在 q 中,它会获取它们并将它们放入 Qmain。

Monitor-thread循环从外部获取数据,打包后放入Q2。

应用程序的主进程还运行一个循环检查 Qmain 的计数,如果有任何项目,则在循环的每次迭代中从 Qmain 获取一个项目并进一步处理它。在此处理期间,它偶尔会将项目放入 Q1 以供稍后处理(当它依次从 Qmain 获取时)。

问题:
我已经按照上述方式实现了所有功能,它在随机(短)时间内工作,然后挂起。我已经设法确定崩溃的来源发生在 fifo-q 计数的递增/递减中(它可能发生在其中任何一个中)。

我尝试过:
使用三个互斥锁:QMAIN_LOCK、Q1_LOCK 和 Q2_LOCK,只要在相关的 fifo-q 上完成任何 get/put 操作,我就会锁定它们。结果:应用程序无法运行,只是挂起。

主进程必须一直持续运行,不能在“读取”时被阻塞(命名管道失败,套接字对失败)。

有什么建议吗?
我想我没有正确实施互斥锁,应该怎么做?
(也欢迎任何关于改进上述设计的意见)

[编辑] 下面是进程和 fifo-q-template:
我应该在哪里以及如何放置互斥锁以避免上述问题?

main-process:
...
start thread QMaster
start thread Monitor
...
while (!quit)
{
    ...
    if (Qmain.count() > 0)
    {
        X = Qmain.get();
        process(X) 
            delete X;
    }
    ...
    //at some random time:
    Q2.put(Y);
    ...
}

Monitor:
{
    while (1)
    {
        //obtain & package data
        Q2.put(data)
    }
}

QMaster:
{
    while(1)
    {
        if (Q1.count() > 0)
            Qmain.put(Q1.get());

        if (Q2.count() > 0)
            Qmain.put(Q2.get());
    }
}

fifo_q:
template < class X* > class fifo_q
{
    struct item
    {
        X* data;
        item *next;
        item() { data=NULL; next=NULL; }
    }
    item *head, *tail;
    int count;
public:
    fifo_q() { head=tail=NULL; count=0; }
    ~fifo_q() { clear(); /*deletes all items*/ }
    void put(X x) { item i=new item(); (... adds to tail...); count++; }
    X* get() { X *d = h.data; (...deletes head ...); count--; return d; }
    clear() {...}
};
4

6 回答 6

1

Use the debugger. When your solution with mutexes hangs look at what the threads are doing and you will get a good idea about the cause of the problem.

What is your platform? In Unix/Linux you can use POSIX message queues (you can also use System V message queues, sockets, FIFOs, ...) so you don't need mutexes.

Learn about condition variables. By your description it looks like your Qmaster-thread is busy looping, burning your CPU.

One of your responses suggest you are doing something like:

Q2_mutex.lock()
Qmain_mutex.lock()
Qmain.put(Q2.get())
Qmain_mutex.unlock()
Q2_mutex.unlock()

but you probably want to do it like:

Q2_mutex.lock()
X = Q2.get()
Q2_mutex.unlock()

Qmain_mutex.lock()
Qmain.put(X)
Qmain_mutex.unlock()

and as Gregory suggested above, encapsulate the logic into the get/put.

EDIT: Now that you posted your code I wonder, is this a learning exercise? Because I see that you are coding your own FIFO queue class instead of using the C++ standard std::queue. I suppose you have tested your class really well and the problem is not there.

Also, I don't understand why you need three different queues. It seems that the Qmain queue would be enough, and then you will not need the Qmaster thread that is indeed busy waiting.

About the encapsulation, you can create a synch_fifo_q class that encapsulates the fifo_q class. Add a private mutex variable and then the public methods (put, get, clear, count,...) should be like put(X) { lock m_mutex; m_fifo_q.put(X); unlock m_mutex; }

question: what would happen if you have more than one reader from the queue? Is it guaranteed that after a "count() > 0" you can do a "get()" and get an element?

于 2010-10-14T08:36:25.297 回答
1

我在下面写了一个简单的应用程序:

#include <queue>
#include <windows.h>
#include <process.h>
using namespace std;

queue<int> QMain, Q1, Q2;
CRITICAL_SECTION csMain, cs1, cs2;

unsigned  __stdcall TMaster(void*)
{
    while(1)
    {
        if( Q1.size() > 0)
        {
            ::EnterCriticalSection(&cs1);
            ::EnterCriticalSection(&csMain);
            int i1 = Q1.front();
            Q1.pop();
            //use i1;
            i1 = 2 * i1;
            //end use;
            QMain.push(i1);
            ::LeaveCriticalSection(&csMain);
            ::LeaveCriticalSection(&cs1);
        }
        if( Q2.size() > 0)
        {
            ::EnterCriticalSection(&cs2);
            ::EnterCriticalSection(&csMain);
            int i1 = Q2.front();
            Q2.pop();
            //use i1;
            i1 = 3 * i1;
            //end use;
            QMain.push(i1);
            ::LeaveCriticalSection(&csMain);
            ::LeaveCriticalSection(&cs2);
        }
    }
    return 0;
}

unsigned  __stdcall TMoniter(void*)
{
    while(1)
    {
        int irand = ::rand();
        if ( irand % 6 >= 3)
        {
            ::EnterCriticalSection(&cs2);
            Q2.push(irand % 6);
            ::LeaveCriticalSection(&cs2);
        }
    }
    return 0;
}

unsigned  __stdcall TMain(void)
{
    while(1)
    {
        if (QMain.size() > 0)
        {
            ::EnterCriticalSection(&cs1);
            ::EnterCriticalSection(&csMain);
            int i = QMain.front();
            QMain.pop();
            i = 4 * i;
            Q1.push(i);
            ::LeaveCriticalSection(&csMain);
            ::LeaveCriticalSection(&cs1);
        }
    }
    return 0;
}

int _tmain(int argc, _TCHAR* argv[])
{
    ::InitializeCriticalSection(&cs1);
    ::InitializeCriticalSection(&cs2);
    ::InitializeCriticalSection(&csMain);
    unsigned threadID;
    ::_beginthreadex(NULL, 0, &TMaster, NULL, 0, &threadID);
    ::_beginthreadex(NULL, 0, &TMoniter, NULL, 0, &threadID);
    TMain();

    return 0;
}
于 2010-10-14T10:00:48.593 回答
1

当您已经锁定了一个互斥锁时,您不应该再锁定第二个互斥锁。

由于问题是用 C++ 标记的,我建议在队列类的获取/添加逻辑中实现锁定(例如,使用升压锁),或者如果您的队列不是一个类,则编写一个包装器。

这允许您简化锁定逻辑。

关于您添加的来源:队列大小检查和以下 put/get 应该在一个事务中完成,否则另一个线程可以在两者之间编辑队列

于 2010-10-14T08:04:01.960 回答
1

我将如何调整设计并以 posix 方式锁定队列访问的示例。请注意,我将包装互斥锁以使用 RAII 或使用 boost-threading,并且我将使用 stl::deque 或 stl::queue 作为队列,但尽可能接近您的代码:

main-process:
...
start thread Monitor
...
while (!quit)
{
    ...
    if (Qmain.count() > 0)
    {
        X = Qmain.get();
        process(X) 
            delete X;
    }
    ...
    //at some random time:
    QMain.put(Y);
    ...
}

Monitor:
{
    while (1)
    {
        //obtain & package data
        QMain.put(data)
    }
}

fifo_q:
template < class X* > class fifo_q
{
    struct item
    {
        X* data;
        item *next;
        item() { data=NULL; next=NULL; }
    }
    item *head, *tail;
    int count;
    pthread_mutex_t m;
public:
    fifo_q() { head=tail=NULL; count=0; }
    ~fifo_q() { clear(); /*deletes all items*/ }
    void put(X x) 
    { 
      pthread_mutex_lock(&m);
      item i=new item(); 
      (... adds to tail...); 
      count++; 
      pthread_mutex_unlock(&m);
    }
    X* get() 
    { 
      pthread_mutex_lock(&m);
      X *d = h.data; 
      (...deletes head ...); 
      count--; 
      pthread_mutex_unlock(&m);
      return d; 
    }
    clear() {...}
};

还要注意,互斥锁仍然需要像这里的示例一样初始化,并且 count() 也应该使用互斥锁

于 2010-10-14T10:05:43.537 回答
0

您是否同时获得多个锁?这通常是您要避免的。如果必须,请确保您始终在每个线程中以相同的顺序获取锁(这对您的并发性更具限制性,并且您通常希望避免它)。

其他并发建议:您是否在读取队列大小之前获取锁?如果您使用互斥锁来保护队列,那么您的队列实现不是并发的,您可能需要在读取队列大小之前获取锁。

于 2010-10-14T07:49:15.130 回答
0

由于此规则“主进程必须始终继续运行,不得在‘读取’时阻塞”,可能会出现 1 个问题。你是如何实施的?'get' 和 'read' 和有什么不一样?

问题似乎出在您的实施中,而不是在逻辑中。正如您所说,您不应该处于任何死锁状态,因为无论是否处于锁中,您都没有获得另一个锁。

于 2010-10-14T08:24:09.657 回答