0

有一个下载器应用程序,它在多个线程中对下载项目执行不同类型的处理。一些线程分析输入数据,一些执行下载、提取、保存状态等。因此,每种类型的线程都对某些数据成员进行操作,其中一些线程可能同时运行。下载项可以这样描述:

class File;

class Download
{
public:
    enum State
    {
        Parsing, Downloading, Extracting, Repairing, Finished
    };

    Download(const std::string &filePath): filePath(filePath) { }

    void save()
    {
        // TODO: save data consistently

        StateFile f;    // state file for this download

        // save general download parameters
        f << filePath << state << bytesWritten << totalFiles << processedFiles;

        // Now we are to save the parameters of the files which belong to this download,
        // (!) but assume the downloading thread kicks in, downloads some data and 
        // changes the state of a file. That causes "bytesWritten", "processedFiles" 
        // and "state" to be different from what we have just saved.

        // When we finally save the state of the files their parameters don't match 
        // the parameters of the download (state, bytesWritten, processedFiles).
        for (File *f : files)
        {
            // save the file...
        }
    }

private:
    std::string filePath;
    std::atomic<State> state = Parsing;
    std::atomic<int> bytesWritten = 0;
    int totalFiles = 0;
    std::atomic<int> processedFiles = 0;
    std::mutex fileMutex;
    std::vector<File*> files;
};

我想知道如何一致地保存这些数据。例如,状态和已处理文件的数量可能已经保存,我们将保存文件列表。同时其他一些线程可能会改变文件的状态,从而改变处理文件的数量或下载的状态,使保存的数据不一致。

想到的第一个想法是为所有数据成员添加一个互斥锁,并在每次访问它们时锁定。但这可能是低效的,因为大多数时间线程访问不同的数据成员并且在几分钟内只进行一次保存。

在我看来这样的任务在多线程编程中相当普遍,所以我希望有经验的人能提出更好的方法。

4

1 回答 1

0

我建议您使用生产者消费者模式。

下载器向解析器生成并通知其使用,解析器向提取器生成并通知其使用,提取器向修复器生成。然后,您将为每个任务创建一个队列。同步可以使用条件变量进行优化,因此消费者只有在生产了某些东西后才收到通知时才拉动。您最终将使用更少的互斥锁和更具可读性和效率的设计。

这是队列的示例代码,如果您必须同时下载、解析、提取和保存,该怎么做:

#include <thread>
#include <condition_variable>
#include <mutex>
#include <queue>
template<typename T>
class synchronized_queu
{
public:
    T consume_one()
    {
        std::unique_lock<std::mutex> lock(lock_);
        while (queue_.size() == 0)
            condition_.wait(lock); //release and obtain again
        T available_data = queue_.front();
        queue_.pop();
        return available_data;
    }
    void produce_one(const T& data)
    {
        std::unique_lock<std::mutex> lock(lock_);
        queue_.push(data);
        condition_.notify_one();//notify only one or all as per your design...
    }
private:
    std::queue<T> queue_;
    std::mutex lock_;
    std::condition_variable condition_;
};
struct data
{
    //.....
};

void download(synchronized_queu<data>& q)
{
    //...
    data data_downloaded = ; //data downloaded;
    q.produce_one(data_downloaded);
}

void parse(synchronized_queu<data>& q1, synchronized_queu<data>& q2)
{
    //...
    data data_downloaded = q1.consume_one();
    //parse
    data data_parsed = ;//....
    q2.produce_one(data_parsed);
}

void extract(synchronized_queu<data>& q1, synchronized_queu<data>& q2)
{
    //...
    data data_parsed = q1.consume_one();
    //parse
    data data_extracted = ;//....
    q2.produce_one(data_extracted);
}
void save(synchronized_queu<data>& q)
{
    data data_extracted = q.consume_one();
    //save....
}

int main()
{
    synchronized_queu<data> dowlowded_queue;
    synchronized_queu<data> parsed_queue;
    synchronized_queu<data> extracted_queue;

    std::thread downloader(download, dowlowded_queue);
    std::thread parser(parse, dowlowded_queue, parsed_queue);
    std::thread extractor(extract, parsed_queue, extracted_queue);
    std::thread saver(save, extracted_queue);
    while (/*condition to stop program*/)
    {

    }
    downloader.join();
    parser.join();
    extractor.join();
    saver.join();
    return 0;
}
于 2016-10-01T07:59:22.213 回答