2

我正在编写一个线程应用程序,它将处理资源列表,并且可能会或可能不会将结果项放置在每个资源的容器(std::map)中。资源的处理发生在多个线程中。

将遍历结果容器,每个项目由一个单独的线程处理,该线程接受一个项目并更新 MySQL 数据库(使用 mysqlcppconn API),然后从容器中删除该项目并继续。

为简单起见,这里是逻辑的概述:

queueWorker() - thread
    getResourcesList() - seeds the global queue

databaseWorker() - thread
    commitProcessedResources() - commits results to a database every n seconds

processResources() - thread x <# of processor cores>
    processResource()
    queueResultItem()

以及显示我在做什么的伪实现。

/* not the actual stucts, but just for simplicities sake */
struct queue_item_t {
    int id;
    string hash;
    string text;
};

struct result_item_t {
    string hash; // hexadecimal sha1 digest
    int state;
}

std::map< string, queue_item_t > queue;
std::map< string, result_item_t > results;

bool processResource (queue_item_t *item)
{
    result_item_t result;

    if (some_stuff_that_doesnt_apply_to_all_resources)
    {
        result.hash = item->hash;
        result.state = 1;

        /* PROBLEM IS HERE */
        queueResultItem(result);
    }
}

void commitProcessedResources ()
{
    pthread_mutex_lock(&resultQueueMutex);

    // this can take a while since there

    for (std::map< string, result_item_t >::iterator it = results.begin; it != results.end();)
    {
        // do mysql stuff that takes a while

        results.erase(it++);
    }

    pthread_mutex_unlock(&resultQueueMutex);
}

void queueResultItem (result_item_t result)
{
    pthread_mutex_lock(&resultQueueMutex);

    results.insert(make_pair(result.hash, result));

    pthread_mutex_unlock(&resultQueueMutex);
}

如 processResource() 所示,问题就在那里,当 commitProcessedResources() 正在运行并且 resultQueueMutex 被锁定时,我们将在这里等待 queueResultItem() 返回,因为它会尝试锁定相同的互斥体,因此会等待直到完成,这可能需要一段时间。

显然,由于运行的线程数量有限,因此一旦所有线程都在等待 queueResultItem() 完成,在互斥锁被释放并可用于 queueResultItem() 之前,将不再进行任何工作。

所以,我的问题是我如何最好地实现这一点?是否有一种特定类型的标准容器可以同时插入和删除,或者是否存在我不知道的东西?

每个队列项都可以拥有自己的唯一键并不是绝对必要的,就像 std::map 的情况一样,但我更喜欢它,因为多个资源可以产生相同的结果,我更愿意只发送一个唯一的结果到数据库,即使它确实使用 INSERT IGNORE 忽略任何重复项。

不幸的是,我对 C++ 还很陌生,所以我不知道在 Google 上要寻找什么。:(

4

3 回答 3

7

在处理过程中,您不必一直为队列持有锁commitProcessedResources ()。您可以改为将队列与空队列交换:

void commitProcessedResources ()
{
    std::map< string, result_item_t > queue2;
    pthread_mutex_lock(&resultQueueMutex);
    // XXX Do a quick swap.
    queue2.swap (results);
    pthread_mutex_unlock(&resultQueueMutex);

    // this can take a while since there

    for (std::map< string, result_item_t >::iterator it = queue2.begin();
        it != queue2.end();)
    {
        // do mysql stuff that takes a while

        // XXX You do not need this.
        //results.erase(it++);
    }   
}
于 2012-04-26T15:29:48.567 回答
0

您将需要使用同步方法(即互斥锁)来使其正常工作。但是,并行编程的目标是最小化临界区(即在您持有锁时执行的代码量)。

也就是说,如果您的 MySQL 查询可以在不同步的情况下并行运行(即多个调用不会相互冲突),请将它们从临界区中取出。这将大大减少开销。例如,如下所示的简单重构可以解决问题

void commitProcessedResources ()
{
    // MOVING THIS LOCK

    // this can take a while since there
    pthread_mutex_lock(&resultQueueMutex);
    std::map<string, result_item_t>::iterator end = results.end();
    std::map<string, result_item_t>::iterator begin = results.begin();
    pthread_mutex_unlock(&resultQueueMutex);

    for (std::map< string, result_item_t >::iterator it = begin; it != end;)
    {
        // do mysql stuff that takes a while

        pthread_mutex_lock(&resultQueueMutex); // Is this the only place we need it?
        // This is a MUCH smaller critical section
        results.erase(it++);
        pthread_mutex_unlock(&resultQueueMutex); // Unlock or everything will block until end of loop
    }

    // MOVED UNLOCK
}

这将使您能够跨多个线程同时“实时”访问数据。也就是说,随着每次写入完成,地图都会更新,并且可以使用当前信息在其他地方读取。

于 2012-04-26T15:29:59.573 回答
0

在 C++03 之前,该标准根本没有定义任何关于线程或线程安全的内容(而且由于您使用的是pthreads,我猜这几乎就是您正在使用的内容)。

因此,您可以对共享地图进行锁定,以确保在任何给定时间只有一个线程尝试访问该地图。否则,您可能会破坏其内部数据结构,因此该地图根本不再有效。

或者(我通常更喜欢这个)你可以让你的多个线程将它们的数据放入一个线程安全的队列中,并有一个线程从该队列中获取数据并将其放入映射中。由于它是单线程的,因此您不再需要在使用地图时锁定地图。

在将映射刷新到磁盘时,有一些合理的可能性来处理延迟。可能最简单的方法是从队列中读取相同的线程,插入映射,并定期将映射刷新到磁盘。在这种情况下,当映射被刷新到磁盘时,传入的数据只是位于队列中。这使得对地图的访问变得简单——因为只有一个线程直接接触它,它可以在没有任何锁定的情况下使用地图。

另一个是有两张地图。在任何给定时间,刷新到磁盘的线程都会获得一个映射,而从队列中检索并插入映射的线程会获得另一个。当刷新线程需要做它的事情时,它只是交换两者的角色。就个人而言,我认为我更喜欢第一种——消除地图周围的所有锁定具有很大的吸引力,至少对我来说是这样。

保持这种简单性的另一个变体是队列->映射线程创建映射,填充它,当它足够满时(即,在适当的时间长度之后)将它填充到另一个队列中,然后从头开始重复(即,创建新地图等) 刷新线程从其传入队列中检索地图,将其刷新到磁盘,然后将其销毁。尽管这会增加一些创建和销毁地图的开销,但您这样做的频率并不足以引起很多关注。您仍然可以随时保持对任何地图的单线程访问,并且仍然保持所有数据库访问与其他所有内容隔离。

于 2012-04-26T15:34:21.567 回答