我有以下代码用于从队列中推送和挂起。调用方代码有多个 MsgQ 对象。_notFull->wait()
Push和Pend 函数可能正在等待_notEmpty->wait()
条件等待。这些等待受 _mut 互斥体保护。notFull 和 notEmpty 等待对empty
andfull
变量进行操作。
当调用析构函数时,会在_deleteQueue
内部调用 ,我想从中向等待线程发出信号以进行清理并停止等待信号到来。完成后,我将删除我的对象。但是,在_deleteQueue
函数中,当我尝试做时_mut->acquire()
,我无法获取互斥锁。即使我忽略了获取,我也无法broadcast
处理这些等待线程。我哪里错了?
谢谢,维克拉姆。
MsgQ::~MsgQ()
{
_deleteQueue();
delete _mut;_mut=NULL;
delete _notFull;_notFull=NULL;
delete _notEmpty;_notEmpty=NULL;
delete _PostMutex; _PostMutex = NULL;
delete _PendMutex; _PendMutex = NULL;
delete _PostInProgressMutex; _PostInProgressMutex = NULL;
delete _PendInProgressMutex; _PendInProgressMutex = NULL;
delete _DisconnectMutex; _DisconnectMutex = NULL;
free( _ptrQueue ); _ptrQueue = NULL;
}
int MsgQ::Post(Message* msg)
{
_PostMutex->acquire();
_postInProgress++;
_PostMutex->release();
if (msg)
msg->print();
_mut->acquire();
while (full)
{
_notFull->wait();
}
if (!_disconnectInProgress)
_queuePush(msg);
_mut->release();
_PostMutex->acquire();
_postInProgress--;
if (_postInProgress==0)
{
_PostInProgressMutex->signal();
}
_PostMutex->release();
return _notEmpty->signal();
}
int MsgQ::Pend(Message*& msg)
{
_PendMutex->acquire();
_pendInProgress++;
_PendMutex->release();
_mut->acquire();
while (empty)
_notEmpty->wait();
if (!_disconnectInProgress)
{
_queuePop(msg);
}
_mut->release();
_PendMutex->acquire();
_pendInProgress--;
if (_pendInProgress == 0)
{
_PendInProgressMutex->signal();
}
_PendMutex->release();
return _notFull->signal();
}
void MsgQ::_deleteQueue ()
{
_PostMutex->acquire();
if (_postInProgress != 0)
{
_PostMutex->release();
TRACE("Acquiring Mutex.");
_mut->acquire();
full = 0;
_notFull->broadcast();
_mut->release();
_PostInProgressMutex->wait();
}
else
{
_PostMutex->release();
}
_PendMutex->acquire();
if (_pendInProgress != 0)
{
_PendMutex->release();
TRACE("Acquiring Mutex.");
_mut->acquire();
empty = 0;
_notEmpty->broadcast();
_mut->release();
_PendInProgressMutex->wait();
}
else
{
_PendMutex->release();
}
}
void MsgQ::_initQueue()
{
_ptrQueue = (Message **)(malloc (size * sizeof (Message*)));
if (_ptrQueue == NULL)
{
cout << "queue could not be created!" << endl;
}
else
{
for (int i = 0; i < size; i++)
*(_ptrQueue + i) = NULL;
empty = 1;
full = 0;
head = 0;
tail = 0;
try{
_mut = new ACE_Mutex() ;
_notFull = new ACE_Condition<ACE_Mutex>(*_mut);
_notEmpty = new ACE_Condition<ACE_Mutex>(*_mut);
_PostMutex = new ACE_Mutex();
_PendMutex = new ACE_Mutex();
_PostInProgressMutex = new ACE_Condition<ACE_Mutex>(*_PostMutex);
_PendInProgressMutex = new ACE_Condition<ACE_Mutex>(*_PendMutex);
_DisconnectMutex = new ACE_Mutex();
_postInProgress = 0;
_pendInProgress = 0;
_disconnectInProgress = false;
}catch(...){
cout << "you should not be here" << endl;
}
}
}