我已经实现了一个“Ticket”类,它在多个线程之间作为 shared_ptr 共享。
程序流程是这样的:
- 调用 parallelQuery() 来启动一个新的查询作业。创建一个 Ticket 的共享实例。
- 查询被分成多个任务,每个任务都在一个工作线程上排队(这部分很重要,否则我只需加入线程并完成)。每个任务都会获得共享票证。
- 调用 ticket.wait() 以等待作业的所有任务完成。
- 当一项任务完成时,它会调用工单上的 done() 方法。
- 当所有任务完成后,票证被解锁,来自任务的结果数据聚合并从 parallelQuery() 返回
在伪代码中:
std::vector<T> parallelQuery(std::string str) {
auto ticket = std::make_shared<Ticket>(2);
auto task1 = std::make_unique<Query>(ticket, str+"a");
addTaskToWorker(task1);
auto task2 = std::make_unique<Query>(ticket, str+"b");
addTaskToWorker(task2);
ticket->waitUntilDone();
auto result = aggregateData(task1, task2);
return result;
}
我的代码有效。但是我想知道如果在调用waitUntilDone()的服务员线程再次锁定互斥锁之前执行解锁互斥锁,理论上是否有可能导致死锁。
这是一种可能性吗,如何避免这个陷阱?
这里是完整的Ticket类,注意上面问题描述相关的执行顺序示例注释:
#include <mutex>
#include <atomic>
class Ticket {
public:
Ticket(int numTasks = 1) : _numTasks(numTasks), _done(0), _canceled(false) {
_mutex.lock();
}
void waitUntilDone() {
_doneLock.lock();
if (_done != _numTasks) {
_doneLock.unlock(); // Execution order 1: "waiter" thread is here
_mutex.lock(); // Execution order 3: "waiter" thread is now in a dealock?
}
else {
_doneLock.unlock();
}
}
void done() {
_doneLock.lock();
_done++;
if (_done == _numTasks) {
_mutex.unlock(); // Execution order 2: "task1" thread unlocks the mutex
}
_doneLock.unlock();
}
void cancel() {
_canceled = true;
_mutex.unlock();
}
bool wasCanceled() {
return _canceled;
}
bool isDone() {
return _done >= _numTasks;
}
int getNumTasks() {
return _numTasks;
}
private:
std::atomic<int> _numTasks;
std::atomic<int> _done;
std::atomic<bool> _canceled;
// mutex used for caller wait state
std::mutex _mutex;
// mutex used to safeguard done counter with lock condition in waitUntilDone
std::mutex _doneLock;
};
编辑问题时我想到的一种可能的解决方案是我可以把 _done++; 在 _doneLock() 之前。最终,这应该足够了吗?
更新
我已经根据 Tomer 和 Phil1970 提供的建议更新了 Ticket 类。以下实现是否避免了提到的陷阱?
class Ticket {
public:
Ticket(int numTasks = 1) : _numTasks(numTasks), _done(0), _canceled(false) { }
void waitUntilDone() {
std::unique_lock<std::mutex> lock(_mutex);
// loop to avoid spurious wakeups
while (_done != _numTasks && !_canceled) {
_condVar.wait(lock);
}
}
void done() {
std::unique_lock<std::mutex> lock(_mutex);
// just bail out in case we call done more often than needed
if (_done == _numTasks) {
return;
}
_done++;
_condVar.notify_one();
}
void cancel() {
std::unique_lock<std::mutex> lock(_mutex);
_canceled = true;
_condVar.notify_one();
}
const bool wasCanceled() const {
return _canceled;
}
const bool isDone() const {
return _done >= _numTasks;
}
const int getNumTasks() const {
return _numTasks;
}
private:
std::atomic<int> _numTasks;
std::atomic<int> _done;
std::atomic<bool> _canceled;
std::mutex _mutex;
std::condition_variable _condVar;
};