3

使用 POSIX 线程和 C++,我有一个“插入操作”,一次只能安全地完成一个。

如果我有多个线程等待使用 pthread_join 插入,则在完成时生成一个新线程。他们是否都会一次收到“线程完成”信号并产生多个插入,或者假设首先收到“线程完成”信号的线程将产生一个新线程,阻止其他线程创建新线程。

/* --- GLOBAL --- */
pthread_t insertThread;



/* --- DIFFERENT THREADS --- */
// Wait for Current insert to finish
pthread_join(insertThread, NULL); 

// Done start a new one
pthread_create(&insertThread, NULL, Insert, Data);

感谢您的答复

该程序基本上是一个巨大的哈希表,它通过套接字接收来自客户端的请求。

每个新的客户端连接都会产生一个新线程,然后它可以从中执行多个操作,特别是查找或插入。查找可以并行进行。但是插入需要“重新组合”到一个线程中。您可以说查找操作可以在不为客户端生成新线程的情况下完成,但是它们可能需要一段时间导致服务器锁定,从而丢弃新请求。该设计试图尽可能减少系统调用和线程创建。

但现在我知道我最初认为我应该能够拼凑一些东西的方式并不安全

谢谢

4

9 回答 9

3

来自pthread_join 上的 opengroup.org

指定同一目标线程的多个同时调用 pthread_join() 的结果未定义。

所以,你真的不应该有几个线程加入你以前的 insertThread。

首先,当你使用 C++ 时,我推荐boost.thread。它们类似于线程的 POSIX 模型,并且也适用于 Windows。它可以帮助您使用 C++,即通过使函数对象更容易使用。

第二,你为什么要开始一个新的线程来插入一个元素,而你总是要等待前一个线程完成才能开始下一个线程?似乎不是多线程的经典使用。

虽然......对此的一种经典解决方案是让一个工作线程从事件队列中获取作业,而其他线程将操作发布到事件队列中。

如果您真的只是想或多或少地保持现在的状态,则必须这样做:

  • 创建一个条件变量,例如insert_finished.
  • 所有想要插入的线程都在条件变量上等待。
  • 一旦一个线程完成插入,它就会触发条件变量。
  • 由于条件变量需要互斥体,您可以通知所有等待的线程,它们都想开始插入,但由于一次只有一个线程可以获取互斥体,所有线程将顺序执行插入。

但是您应该注意不要以过于临时的方式实现同​​步。由于这被称为insert,我怀疑您想要操作数据结构,因此您可能希望首先实现线程安全的数据结构,而不是共享数据结构访问和所有客户端之间的同步。我也怀疑会有更多的操作然后只是insert,这需要适当的同步......

于 2009-01-10T15:26:58.717 回答
2

根据 Single Unix Specifcation:“多个同时调用 pthread_join() 指定同一目标线程的结果是未定义的。”

实现单线程获取任务的“正常方式”是设置一个条件变量(不要忘记相关的互斥锁):空闲线程在 pthread_cond_wait()(或 pthread_cond_timedwait())中等待,当线程工作已经完成,它用 pthread_cond_signal() 唤醒一个空闲的。

于 2009-01-10T15:21:30.647 回答
1

是的,因为大多数人建议最好的方法似乎是让工作线程从队列中读取。下面的一些代码片段

    pthread_t       insertThread = NULL;
    pthread_mutex_t insertConditionNewMutex = PTHREAD_MUTEX_INITIALIZER;
    pthread_mutex_t insertConditionDoneMutex    = PTHREAD_MUTEX_INITIALIZER;
    pthread_cond_t  insertConditionNew      = PTHREAD_COND_INITIALIZER;
    pthread_cond_t  insertConditionDone     = PTHREAD_COND_INITIALIZER;

       //Thread for new incoming connection
        void * newBatchInsert()
        {
           for(each Word)
           {
                            //Push It into the queue
                            pthread_mutex_lock(&lexicon[newPendingWord->length - 1]->insertQueueMutex);
                                lexicon[newPendingWord->length - 1]->insertQueue.push(newPendingWord);
                            pthread_mutex_unlock(&lexicon[newPendingWord->length - 1]->insertQueueMutex);

           }

                    //Send signal to worker Thread
                    pthread_mutex_lock(&insertConditionNewMutex);
                        pthread_cond_signal(&insertConditionNew);
                    pthread_mutex_unlock(&insertConditionNewMutex);

                    //Wait Until it's finished
                    pthread_cond_wait(&insertConditionDone, &insertConditionDoneMutex);

        }


            //Worker thread
            void * insertWorker(void *)
            {

                while(1)        
                {

                    pthread_cond_wait(&insertConditionNew, &insertConditionNewMutex);

                    for (int ii = 0; ii < maxWordLength; ++ii)
                    {                   
                            while (!lexicon[ii]->insertQueue.empty())
                            {

                                queueNode * newPendingWord = lexicon[ii]->insertQueue.front();


                                lexicon[ii]->insert(newPendingWord->word);

                                pthread_mutex_lock(&lexicon[ii]->insertQueueMutex);
                                lexicon[ii]->insertQueue.pop();
                                pthread_mutex_unlock(&lexicon[ii]->insertQueueMutex);

                            }

                    }

                    //Send signal that it's done
                    pthread_mutex_lock(&insertConditionDoneMutex);
                        pthread_cond_broadcast(&insertConditionDone);
                    pthread_mutex_unlock(&insertConditionDoneMutex);

                }

            }

            int main(int argc, char * const argv[]) 
            {

                pthread_create(&insertThread, NULL, &insertWorker, NULL);


                lexiconServer = new server(serverPort, (void *) newBatchInsert);

                return 0;
            }
于 2009-01-11T11:29:11.770 回答
0

其他人已经指出这具有未定义的行为。我只想补充一点,完成任务的最简单的方法(只允许一个线程执行部分代码)是使用一个简单的互斥锁 - 您需要执行该代码的线程是互斥的,这就是互斥锁出现的地方它的名字 :-)

如果您需要在特定线程(如 Java AWT)中运行代码,那么您需要条件变量。但是,您应该三思而后行,这种解决方案是否真的有回报。想象一下,如果您每秒调用 10000 次“插入操作”,您需要多少次上下文切换。

于 2009-01-10T16:10:10.567 回答
0

正如您刚才提到的,您正在使用与插入并行的多个查找的哈希表,我建议您检查是否可以使用并发哈希表。

由于当您同时插入元素时,确切的查找结果是不确定的,因此这样的并发散列映射可能正是您所需要的。不过,我没有在 C++ 中使用并发哈希表,但由于它们在 Java 中可用,您肯定会找到一个在 C++ 中执行此操作的库。

于 2009-01-10T16:38:54.440 回答
0

我发现的唯一一个支持插入而不锁定新查找的库 - Sunrise DD(我不确定它是否支持并发插入)

然而,从 Google 的稀疏散列映射转换后,内存使用量增加了一倍多。查找应该很少发生,所以与其尝试编写我自己的库,它结合了两者的优点,我宁愿在安全地进行更改时锁定表挂起查找。

再次感谢

于 2009-01-10T16:56:01.347 回答
0

在我看来,您想将插入序列化到哈希表中。

为此,您需要一个锁 - 而不是产生新线程。

于 2009-01-10T17:27:01.660 回答
0

从您的描述来看,您每次想要插入某些东西时都在重新创建插入线程,因此看起来效率很低。创建线程的成本不是0。

这个问题的一个更常见的解决方案是产生一个在队列上等待的插入线程(即,当循环为空时,它处于一个休眠的循环中)。然后其他线程将工作项添加到队列中。插入线程按照添加顺序(或根据需要按优先级)选择队列中的项目并执行适当的操作。

您所要做的就是确保对队列的添加受到保护,以便一次只有一个线程可以访问修改实际队列,并且插入线程不会进行忙碌等待,而是在队列中没有任何内容时休眠(见条件变量)。

于 2009-01-11T07:49:46.003 回答
0

理想情况下,您不希望单个进程中有多个线程池,即使它们执行不同的操作。线程的可重用性是一个重要的架构定义,如果使用 C,它会导致在主线程中创建 pthread_join。

当然,对于 C++ 线程池(又名 ThreadFactory),其想法是保持线程原语抽象,因此它可以处理传递给它的任何函数/操作类型。

一个典型的例子是一个网络服务器,它将具有连接池和线程池,它们为连接提供服务,然后进一步处理它们,但是,所有这些都是从一个公共线程池进程派生的。

摘要:避免在主线程以外的任何地方使用 PTHREAD_JOIN。

于 2012-01-01T22:25:47.527 回答