0

我已经实现了一个“Ticket”类,它在多个线程之间作为 shared_ptr 共享。

程序流程是这样的:

  1. 调用 parallelQuery() 来启动一个新的查询作业。创建一个 Ticket 的共享实例。
  2. 查询被分成多个任务,每个任务都在一个工作线程上排队(这部分很重要,否则我只需加入线程并完成)。每个任务都会获得共享票证。
  3. 调用 ticket.wait() 以等待作业的所有任务完成。
  4. 当一项任务完成时,它会调用工单上的 done() 方法。
  5. 当所有任务完成后,票证被解锁,来自任务的结果数据聚合并从 parallelQuery() 返回

在伪代码中:

     std::vector<T> parallelQuery(std::string str) {
         auto ticket = std::make_shared<Ticket>(2);
         auto task1 = std::make_unique<Query>(ticket, str+"a");
         addTaskToWorker(task1);
         auto task2 = std::make_unique<Query>(ticket, str+"b");
         addTaskToWorker(task2);
         ticket->waitUntilDone();
         auto result = aggregateData(task1, task2);
         return result;
     }

我的代码有效。但是我想知道如果在调用waitUntilDone()的服务员线程再次锁定互斥锁之前执行解锁互斥锁,理论上是否有可能导致死锁。

这是一种可能性吗,如何避免这个陷阱?

这里是完整的Ticket类,注意上面问题描述相关的执行顺序示例注释:

#include <mutex>
#include <atomic>

    class Ticket {
    public:
        Ticket(int numTasks = 1) : _numTasks(numTasks), _done(0), _canceled(false) {
            _mutex.lock();
        }

        void waitUntilDone() {
            _doneLock.lock();
            if (_done != _numTasks) {
                _doneLock.unlock(); // Execution order 1: "waiter" thread is here
                _mutex.lock(); // Execution order 3: "waiter" thread is now in a dealock?
            }
            else {
                _doneLock.unlock();
            }
        }

        void done() {
            _doneLock.lock();
            _done++;
            if (_done == _numTasks) {
                _mutex.unlock(); // Execution order 2: "task1" thread unlocks the mutex
            }
            _doneLock.unlock();
        }

        void cancel() {
            _canceled = true;
            _mutex.unlock();
        }

        bool wasCanceled() {
            return _canceled;
        }

        bool isDone() {
            return _done >= _numTasks;
        }

        int getNumTasks() {
            return _numTasks;
        }

    private:
        std::atomic<int> _numTasks;
        std::atomic<int> _done;
        std::atomic<bool> _canceled;
        // mutex used for caller wait state
        std::mutex _mutex;
        // mutex used to safeguard done counter with lock condition in waitUntilDone
        std::mutex _doneLock;
    };

编辑问题时我想到的一种可能的解决方案是我可以把 _done++; 在 _doneLock() 之前。最终,这应该足够了吗?

更新

我已经根据 Tomer 和 Phil1970 提供的建议更新了 Ticket 类。以下实现是否避免了提到的陷阱?

class Ticket {
public:
    Ticket(int numTasks = 1) : _numTasks(numTasks), _done(0), _canceled(false) { }

    void waitUntilDone() {
        std::unique_lock<std::mutex> lock(_mutex);
        // loop to avoid spurious wakeups
        while (_done != _numTasks && !_canceled) {
            _condVar.wait(lock);
        }
    }

    void done() {
        std::unique_lock<std::mutex> lock(_mutex);
        // just bail out in case we call done more often than needed
        if (_done == _numTasks) {
            return;
        }
        _done++;
        _condVar.notify_one();
    }

    void cancel() {
        std::unique_lock<std::mutex> lock(_mutex);
        _canceled = true;
        _condVar.notify_one();
    }

    const bool wasCanceled() const {
        return _canceled;
    }

    const bool isDone() const {
        return _done >= _numTasks;
    }

    const int getNumTasks() const {
        return _numTasks;
    }

private:
    std::atomic<int> _numTasks;
    std::atomic<int> _done;
    std::atomic<bool> _canceled;
    std::mutex _mutex;
    std::condition_variable _condVar;
};
4

2 回答 2

3

不要编写自己的等待方法,而是使用std::condition_variable

https://en.cppreference.com/w/cpp/thread/condition_variable

互斥锁的使用

通常, amutex应该保护给定的代码区域。也就是说,它应该锁定、完成工作并解锁。在您的班级中,您有多种方法,其中一些锁定_mutex而另一些解锁。这很容易出错,就好像您以错误的顺序调用该方法一样,您很可能处于不一致的状态。如果互斥锁被锁定两次会发生什么?还是在已经解锁时解锁?

使用互斥锁要注意的另一件事是,如果您有多个互斥锁,如果您需要锁定两个互斥锁但不以一致的顺序执行,则很容易出现死锁。假设线程 A 先锁定 mutex 1 和 mutex 2,然后线程 B 以相反的顺序锁定它们(首先是 mutex 2)。有可能发生这样的事情:

  • 线程 A 锁互斥体 1
  • 线程 B 锁定互斥锁 2
  • 线程 A 想要锁定互斥锁 2,但不能,因为它已经被锁定。
  • 线程 B 想要锁定互斥锁 1,但不能,因为它已经被锁定。
  • 两个线程将永远等待

因此,在您的代码中,您至少应该进行一些检查以确保正确使用。例如,您应该_canceled在解锁互斥锁之前进行验证,以确保cancel只调用一次。

解决方案

我只会给出一些想法

声明一个 mutux 和一个 condition_variable 来管理类中的完成条件。

std::mutex doneMutex;
std::condition_variable done_condition;

然后waitUntilDone看起来像:

void waitUntilDone()
{
    std::unique_lock<std::mutex> lk(doneMutex);
    done_condition.wait(lk, []{ return isDone() || wasCancelled();});
}

done功能看起来像:

void done() 
{
    std::lock_guard<std::mutex> lk(doneMutex);
    _done++;
    if (_done == _numTasks) 
    {
        doneCondition.notify_one();
    }
}

功能cancel会变成

void done() 
{
    std::lock_guard<std::mutex> lk(doneMutex);
    _cancelled = true;
    doneCondition.notify_one();
}

如您所见,您现在只有一个互斥锁,因此您基本上消除了死锁的可能性。

变量命名

我建议您不要在互斥锁的名称中使用lock,因为它会造成混淆。

std::mutex someMutex;
std::guard_lock<std::mutex> someLock(someMutex); // std::unique_lock when needed

这样,就更容易知道哪个变量引用互斥锁以及哪个变量引用互斥锁的锁。

好读

如果你对多线程很认真,那么你应该买那本书:

实践中的 C++ 并发
实用多线程
Anthony Williams

代码审查 (添加部分)

基本相同的代码已发布到代码审查:https ://codereview.stackexchange.com/questions/225863/multithreading-ticket-class-to-wait-for-parallel-task-completion/225901#225901 。

我在那里给出了一个答案,其中包括一些额外的点。

于 2019-08-10T13:47:45.487 回答
0

您不需要使用互斥锁来操作原子值

UPD

我对 mainn 问题的回答是错误的。我删了一个。

您可以使用简单(非原子) int _numTasks; 还。而且您不需要共享指针 - 只需在堆栈上创建任务并传递指针

     Ticket ticket(2);
     auto task1 = std::make_unique<Query>(&ticket, str+"a");
     addTaskToWorker(task1);

或者如果你喜欢独特的ptr

     auto ticket = std::make_unique<Ticket>(2);
     auto task1 = std::make_unique<Query>(ticket.get(), str+"a");
     addTaskToWorker(task1);

因为共享指针可以被奥卡姆剃刀切割:)

于 2019-08-10T12:02:50.553 回答