1

我实现了这样的代码,以便在不同线程上运行的多个实例使用读写锁和 shared_ptr 读取其他实例的数据。看起来不错,但我对此不是 100% 确定的,我提出了一些关于这些用法的问题。

细节

我有一个名为 Chunk 的类的多个实例,每个实例在专用线程中进行一些计算。一个块需要读取相邻块的数据以及它自己的数据,但它不写入邻居的数据,所以使用读写锁。此外,可以在运行时设置邻居。例如,我可能想在运行时设置一个不同的邻居块,有时只是 nullptr。也可以在运行时删除块。可以使用原始指针,但我认为 shared_ptr 和 weak_ptr 更适合这个,以便跟踪生命周期。shared_ptr 中自己的数据和weak_ptr 中邻居的数据。

我在下面提供了我的代码的更简单版本。ChunkData 有数据和一个互斥体。我使用 InitData 进行数据初始化,之后在专用线程中调用 DoWork 函数。其他函数可以从主线程调用。这似乎有效,但我不是那么自信。特别是关于在多个线程中使用 shared_ptr 。

  1. 如果一个线程调用 shared_ptr 的 reset()(在 ctor 和 InitData 中)而其他线程将它与 weak_ptr 的锁(在 DoWork 中)一起使用,会发生什么?这需要锁dataMutex或chunkMutex吗?

  2. 复制(在 SetNeighbour 中)怎么样?我也需要锁吗?

我认为其他部分还可以,但是如果您发现任何危险,请告诉我。感谢。

顺便说一句,我考虑过存储 Chunk 的 shared_ptr 而不是 ChunkData,但决定不使用这种方法,因为我不管理的内部代码具有 GC 系统,它可以在我不期望的时候删除指向 Chunk 的指针它。

class Chunk
{
public:
    class ChunkData
    {
    public:

        shared_mutex dataMutex; // mutex to read/write data
        int* data;
        int size;

        ChunkData() : data(nullptr) { }

        ~ChunkData()
        {
            if (data)
            {
                delete[] data;
                data = nullptr;
            }
        }
    };

private:
    mutex chunkMutex;   // mutex to read/write member variables
    shared_ptr<ChunkData> chunkData;
    weak_ptr<ChunkData> neighbourChunkData;
    string result;

public:
    Chunk(string _name)
        : chunkData(make_shared<ChunkData>())
    {
    }

    ~Chunk()
    {
        EndProcess();
        unique_lock lock(chunkMutex);   // is this needed?
        chunkData.reset();
    }

    void InitData(int size)
    {
        ChunkData* NewData = new ChunkData();
        NewData->size = size;
        NewData->data = new int[size];

        {
            unique_lock lock(chunkMutex);   // is this needed?
            chunkData.reset(NewData);
            cout << "init chunk " << name << endl;
        }
    }

    // This is executed in other thread. e.g. thread t(&Chunk::DoWork, this);
    void DoWork()
    {
        lock_guard lock(chunkMutex); // we modify some members such as result(string) reading chunk data, so need this.
        if (chunkData)
        {
            shared_lock readLock(chunkData->dataMutex);
            if (chunkData->data)
            {
                // read chunkData->data[i] and modify some members such as result(string)
                for (int i = 0; i < chunkData->size; ++i)
                {
                    // Is this fine, or should I write data result outside of readLock scope?
                    result += to_string(chunkData->data[i]) + " ";
                }
            }
        }
        // does this work?
        if (shared_ptr<ChunkData> neighbour = neighbourChunkData.lock())
        {
            shared_lock readLock(neighbour->dataMutex);
            if (neighbour->data)
            {
                // read neighbour->data[i] and modify some members as above
            }
        }
    }

    shared_ptr<ChunkData> GetChunkData()
    {
        unique_lock lock(chunkMutex);
        return chunkData;
    }

    void SetNeighbour(Chunk* neighbourChunk)
    {
        if (neighbourChunk)
        {
            // safe?
            shared_ptr<ChunkData> newNeighbourData = neighbourChunk->GetChunkData();
            unique_lock lock(chunkMutex);   // lock for chunk properties
            {
                shared_lock readLock(newNeighbourData->dataMutex);  // not sure if this is needed.
                neighbourChunkData = newNeighbourData;
            }
        }
    }

    int GetDataAt(int index)
    {
        shared_lock readLock(chunkData->dataMutex);
        if (chunkData->data && 0 <= index && index < chunkData->size)
        {
            return chunkData->data[index];
        }
        return 0;
    }

    void SetDataAt(int index, int element)
    {
        unique_lock writeLock(chunkData->dataMutex);
        if (chunkData->data && 0 <= index && index < chunkData->size)
        {
            chunkData->data[index] = element;
        }
    }
};

编辑 1

我为 DoWork 函数添加了更多细节。读取块数据并在函数中编辑块的成员变量。

在 Homer512 的回答之后,我提出了其他问题。

A)在 DoWork 函数中,我在读锁中写入了一个成员变量。我是否应该只读取读锁范围内的数据,如果我需要根据读取的数据修改其他数据,是否必须在读锁之外进行?例如,将整个数组复制到读锁中的局部变量,并使用本地修改读锁之外的其他成员。

B)我跟随 Homer512 并修改 GetDataAt/SetDataAt 如下。在解锁 chunkMutex 之前,我会读/写 lock chunkData->dataMutex。我也在 DoWork 函数中这样做。我应该单独做锁吗?例如,创建一个局部变量 shared_ptr 并在 chunkMutex 锁中设置 chunkData 给它,解锁它,然后最后读/写锁定该局部变量的 dataMutex 和读/写数据。

    int GetDataAt(int index)
    {
        lock_guard chunkLock(chunkMutex);
        shared_lock readLock(chunkData->dataMutex);
        if (chunkData->data && 0 <= index && index < chunkData->size)
        {
            return chunkData->data[index];
        }
        return 0;
    }

    void SetDataAt(int index, int element)
    {
        lock_guard chunkLock(chunkMutex);
        unique_lock writeLock(chunkData->dataMutex);
        if (chunkData->data && 0 <= index && index < chunkData->size)
        {
            chunkData->data[index] = element;
        }
    }
4

1 回答 1

1

我有几点意见:

  1. ~ChunkData:您可以更改您的data成员 fromint*unique_ptr<int[]>获得相同的结果,而无需显式析构函数。您的代码是正确的,只是不太方便。

  2. ~Chunk:我认为您不需要锁或调用重置方法。到析构函数运行时,根据定义,没有人应该拥有对 Chunk 对象的引用。所以锁永远不会有争议。并且重置是不必要的,因为shared_ptr析构函数会处理它。

  3. InitData:是的,需要锁,因为 InitData 可以与 DoWork 竞争。您可以通过将 InitData 移动到构造函数来避免这种情况,但我认为这种划分是有原因的。您也可以更改shared_ptrstd::atomic<std::shared_ptr<ChunkData> >以避免锁定。

  4. 像这样编写 InitData 效率更高:

void InitData(int size)
{
    std::shared_ptr<ChunkData> NewData = std::make_shared<ChunkData>();
    NewData->size = size;
    NewData->data = new int[size]; // or std::make_unique<int[]>(size)
    {
        std::lock_guard<std::mutex> lock(chunkMutex);
        chunkData.swap(NewData);
    }
    // deletes old chunkData outside locked region if it was initialized before 
}

make_shared避免为引用计数器分配额外的内存。这也将所有分配和释放移出临界区。

  1. DoWork:您的评论“准备好 chunkData->data[i] 并修改一些成员”。你只拿一个shared_lock但说你修改成员。好吧,它是阅读还是写作?或者您的意思是说您修改了 Chunk 而不是 ChunkData,而 Chunk 受其自己的互斥体保护?

  2. SetNeighbour:你需要锁定你自己的chunkMutex和邻居的。您不应该同时锁定两者以避免用餐哲学家的问题(尽管std::lock解决了这个问题)。

    void SetNeighbour(Chunk* neighbourChunk)
    {
        if(! neighbourChunk)
            return;
        std::shared_ptr<ChunkData> newNeighbourData;
        {
            std::lock_guard<std::mutex> lock(neighbourChunk->chunkMutex);
            newNeighbourData = neighbourChunk->chunkData;
        }
        std::lock_guard<std::mutex> lock(this->chunkMutex);
        this->neighbourChunkData = newNeighbourData;
    }
  1. GetDataAtSetDataAt:你需要锁定 chunkMutex。否则你可能会与 InitData 竞争。没有必要使用std::lock,因为锁的顺序永远不会交换。

编辑1:

  1. DoWork: 这条线if (shared_ptr<ChunkData> neighbour = neighbourChunkData.lock())不能让邻居活着。将变量声明移出 if 以保留引用。

编辑:替代设计方案

我担心的是,如果 InitData 仍在运行或等待运行,您的 DoWork 可能无法继续。你想如何处理这个问题?我建议你可以等到工作完成。像这样的东西:

class Chunk
{
    std::mutex chunkMutex;
    std::shared_ptr<ChunkData> chunkData;
    std::weak_ptr<ChunkData> neighbourChunkData;
    std::condition_variable chunkSet;

    void waitForChunk(std::unique_lock<std::mutex>& lock)
    {
        while(! chunkData)
            chunkSet.wait(lock);
    }
public:
    // modified version of my code above
    void InitData(int size)
    {
        std::shared_ptr<ChunkData> NewData = std::make_shared<ChunkData>();
        NewData->size = size;
        NewData->data = new int[size]; // or std::make_unique<int[]>(size)
        {
            std::lock_guard<std::mutex> lock(chunkMutex);
            chunkData.swap(NewData);
        }
        chunkSet.notify_all();
    }
    void DoWork()
    {
        std::unique_lock<std::mutex> ownLock(chunkMutex);
        waitForChunk(lock); // blocks until other thread finishes InitData
        {
            shared_lock readLock(chunkData->dataMutex);
            ...
        }
        
        shared_ptr<ChunkData> neighbour = neighbourChunkData.lock();
        if(! neighbour)
            return;
        shared_lock readLock(neighbour->dataMutex);
        ...
    }
    void SetNeighbour(Chunk* neighbourChunk)
    {
        if(! neighbourChunk)
            return;
        shared_ptr<ChunkData> newNeighbourData;
        {
            std::unique_lock<std::mutex> lock(neighbourChunk->chunkMutex);
            neighbourChunk->waitForChunk(lock); // wait until neighbor has finished InitData
            newNeighbourData = neighbourChunk->chunkData;
        }
        std::lock_guard<std::mutex> ownLock(this->chunkMutex);
        this->neighbourChunkData = std::move(newNeighbourData);
    }
};

这样做的缺点是,如果从未调用过 InitData 或者它因异常而失败,您可能会死锁。有一些方法可以解决这个问题,比如使用std::shared_future知道它是有效的(在计划 InitData 时设置)以及它是否失败(记录关联promise或的异常packaged_task)。

于 2022-01-16T11:34:41.120 回答