1

我希望所有线程都从同一个结构中读取。我过去通过在从结构中读取的循环中添加线程来做到这一点,但这次我需要在 void “dowork” 中打开结构,如我的示例所示。

我有以下代码:

struct word_list {
    char word[20];
    struct word_list * next;
};

struct word_list * first_word = NULL;
//other function which loads into struct is missing cause it's not relevant


//in main()
pthread_t thread_id[MAX_THREADS];
int max_thread = 10;
for(t = 0 ; t < max_thread; t++)
{
    pthread_mutex_lock(&thrd_list);
    arg_struct *args = calloc(1, sizeof(*args));
    args->file = file;
    args->t = t;
    args->e = ex;
    pthread_mutex_unlock(&thrd_list);
    if(pthread_create(&thread_id[t],NULL,dowork,args) != 0)
    {
        t--;
        fprintf(stderr,RED "\nError in creating thread\n" NONE);
    }
}

for(t = 0 ; t < max_thread; t++)
    if(pthread_join(thread_id[t],NULL) != 0)
    {
        fprintf(stderr,RED "\nError in joining thread\n" NONE);
    }




void *dowork(void *arguments)
{
    struct word_list * curr_word = first_word;
    char myword[20];
    while( curr_word != NULL )
    {
        pthread_mutex_lock(&thrd_list);
        strncpy(myword,curr_word->word,sizeof(myword) - 1);
        pthread_mutex_unlock(&thrd_list);

        //some irrelevant code is missing

        pthread_mutex_lock(&thrd_list);
        curr_word = curr_word->next;
        pthread_mutex_unlock(&thrd_list);
    }

}

如何在所有线程中从同一结构中读取不同的元素?

4

3 回答 3

2

让我看看我是否理解正确?

  • struct word_list描述某种链表
  • 您希望将该列表的元素分布在线程中。

如果这是您想要的,那么我只需从列表中一一弹出元素并将指向其余元素的指针写回:

volatile struct word_list * first_word = NULL; // important to make it volatile

void *dowork(void *arguments)
{
    struct word_list * curr_word;
    char myword[20];
    do {
        // gain exclusive access to the control structures
        pthread_mutex_lock(&thrd_list);

        // get the next element
        curr_word = first_word;
        if (curr_word == NULL) {
            pthread_mutex_unlock(&thrd_list);
            break;
        }

        // notify the remaining threads what the next element is
        first_word = curr_word->next;

        pthread_mutex_unlock(&thrd_list);

        // do whatever you have to do

    } while (1);

}

如果volatile struct word_list * next_word您不想修改first_word. 确保成功volatile,否则编译器可能会执行导致奇怪结果的优化。

于 2013-09-13T15:36:38.393 回答
2

因此,您希望通过将工作拆分到多个线程来处理大量数据。您的解决方案效率不高,因为您的线程将与许多拥有互斥锁的人进行斗争,并且您无法确定工作是否均匀分布在所有线程中。因此,例如,线程 0 和 1 可能正在完成所有工作,因为它们首次访问互斥锁,而所有其他线程一直处于空闲状态。

如果要提高性能,则需要执行以下操作:-

  • 使所有线程相互独立,即移除同步数据
  • 确保线程之间的内存一致性,即确保第 n+1 项的数据紧邻第 n 项的数据。这有助于 CPU 更好地访问内存。经常在 RAM 上跳跃会产生大量缓存未命中,从而降低性能。

因此,在您的程序中,不是所有线程共享的单个链表,而是每个线程都有一个链表:-

typedef struct _word_list
{
  //data
  struct _word_list *next;
} word_list;

static const int num_threads = 4; // actually, setting this to number of CPUs at run time would be better

word_list 
  *lists [num_threads] = {0};

void ReadWords ()
{
  word_list
    **current [num_threads];

  for (int i = 0 ; i < num_threads ; ++i)
  {
    current = &lists [i];
  }

  int destination = 0;

  while (read some valid input)
  {
    *current [destination] = malloc (sizeof (word_list));
    // set data 
    current [destination] = &current [destination]->next;

    destination = (destination + 1) % num_threads;
  }

  // data has now been read and stored into a series of linked lists, each list having
  // the same number of items (or one less)
}


void create_threads ()
{
   for (int i = 0 ; i < num_threads ; ++i)
   {
      // create thread, and pass it the value of lists [i]
   }
}

void do_work (...)
{
   for (word_list *item = passed in parameter ; item ; item = item->next)
   {
     process data
   }
}

在这个程序中(只是编的,没有检查)我创建了四个链表,并将数据均匀地分配给列表。然后我创建线程并给每个线程一个链表。然后每个线程处理自己的链表(它们是单独的链表)。

现在每个线程都可以全速运行,而不必等待互斥锁来获取数据。内存访问是合理的,但在很大程度上取决于分配器。使用数组而不是链表可以改善这一点,但您需要在分配数组之前知道数据项的数量,这可能是不可能的。

于 2013-09-13T16:05:44.050 回答
2

如果我现在了解您的要求(我想我终于了解了),您需要将您的单词列表视为工作队列。要做到这一点,需要一种通知机制,该机制允许将项目“推送”到队列中,以通知“拉取者”新数据可用。这样的系统确实存在于 pthread 中:条件变量互斥体和它们为控制流管理的谓词的结合。

这是如何使用它的一个例子。我已尝试为您记录每个步骤中发生的情况,希望您能理解。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>

// defined the number of threads in our queue and the number
//  of test items for this demonstration.
#define MAX_THREADS  16
#define MAX_ITEMS    128*1024

typedef struct word_list
{
    char word[20];
    struct word_list * next;

} word_list;

// predicate values for the word list
struct word_list * first_word = NULL;   // current word.
int word_shutdown = 0;                  // shutdown state

// used for protecting our list.
pthread_mutex_t wq_mtx = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t wq_cv = PTHREAD_COND_INITIALIZER;

// worker proc
void *dowork(void*);

int main()
{
    pthread_t thread_id[MAX_THREADS];
    int i=0;

    // start thread pool
    for(i=0; i < MAX_THREADS; ++i)
        pthread_create(thread_id+i, NULL, dowork, NULL);

    // add MAX_ITEMS more entries, we need to latch since the
    //  work threads are actively processing the queue as we go.
    for (i=0;i<MAX_ITEMS;++i)
    {
        word_list *node = malloc(sizeof(*node));
        sprintf(node->word, "Word-%d", i);

        // latch before updating the queue head.
        pthread_mutex_lock(&wq_mtx);
        node->next = first_word;
        first_word = node;

        // no longer need the latch. unlock and inform any
        // potential waiter.
        pthread_mutex_unlock(&wq_mtx);
        pthread_cond_signal(&wq_cv);
    }

    // wait for the condition that the queue is empty
    pthread_mutex_lock(&wq_mtx);
    while (first_word != NULL)
        pthread_cond_wait(&wq_cv, &wq_mtx);
    pthread_mutex_unlock(&wq_mtx);

    // queue is empty, but threads are all still there waiting. So
    //  do it again, just to proves the pool is still intact.
    for (i=0;i<MAX_ITEMS;++i)
    {
        word_list *node = malloc(sizeof(*node));
        sprintf(node->word, "Word-%d", i);

        // latch before updating the queue head.
        pthread_mutex_lock(&wq_mtx);
        node->next = first_word;
        first_word = node;

        // no longer need the latch. unlock and inform any
        // potential waiter.
        pthread_mutex_unlock(&wq_mtx);
        pthread_cond_signal(&wq_cv);
    }

    // again wait for the condition that the queue is empty
    pthread_mutex_lock(&wq_mtx);
    while (first_word != NULL)
        pthread_cond_wait(&wq_cv, &wq_mtx);

    // queue is empty, and we're not adding anything else. latch
    //  the mutex, set the shutdown flag, and tell all the threads.
    //  they need to terminate.
    word_shutdown = 1;
    pthread_mutex_unlock(&wq_mtx);
    pthread_cond_broadcast(&wq_cv);

    for (i=0;i<MAX_THREADS; ++i)
        pthread_join(thread_id[i], NULL);

    return EXIT_SUCCESS;
}


// the work crew will start by locking the mutex, then entering the
//  work loop, looking for entries or a shutdown state
void *dowork(void *arguments)
{
    int n_processed = 0;
    while (1)
    {
        pthread_mutex_lock(&wq_mtx);
        while (first_word == NULL && word_shutdown == 0)
            pthread_cond_wait(&wq_cv, &wq_mtx);

        // we own the mutex, and thus current access to the predicate
        //  values it protects.
        if (first_word != NULL)
        {
            // pull the item off the queue. once we do that we own the
            //  item, so we can unlatch and let another waiter know there
            //  may be more data on the queue.
            word_list *p = first_word;
            first_word = p->next;
            if (p->next)
                pthread_cond_signal(&wq_cv);
            pthread_mutex_unlock(&wq_mtx);

            //
            // TODO: process item here.
            //
            ++n_processed;
            free(p);
        }
        else if (word_shutdown != 0)
            break;
    }

    // we still own the mutex. report on how many items we received, then
    //  one more signal to let someone (anyone, actually) know we're done.
    pthread_t self = pthread_self();
    printf("%p : processed %d items.\n",self, n_processed);
    pthread_mutex_unlock(&wq_mtx);
    pthread_cond_signal(&wq_cv);
    return NULL;
}

示例输出:MAX_THREADS = 4(您的输出会有所不同)

0x100387000 : processed 64909 items.
0x100304000 : processed 64966 items.
0x1000b5000 : processed 64275 items.
0x100281000 : processed 67994 items.

样本输出:MAX_THREADS = 8

0x100304000 : processed 31595 items.
0x1000b5000 : processed 33663 items.
0x100593000 : processed 34298 items.
0x10040a000 : processed 32304 items.
0x10048d000 : processed 32406 items.
0x100387000 : processed 31878 items.
0x100281000 : processed 32317 items.
0x100510000 : processed 33683 items.

样本输出:MAX_THREADS = 16

0x10079f000 : processed 17239 items.
0x101081000 : processed 16530 items.
0x101104000 : processed 16662 items.
0x100699000 : processed 16562 items.
0x10040a000 : processed 16672 items.
0x100593000 : processed 15158 items.
0x10120a000 : processed 17365 items.
0x101187000 : processed 14184 items.
0x100387000 : processed 16332 items.
0x100616000 : processed 16497 items.
0x100281000 : processed 16632 items.
0x100304000 : processed 16222 items.
0x100510000 : processed 17188 items.
0x10048d000 : processed 15367 items.
0x1000b5000 : processed 16912 items.
0x10071c000 : processed 16622 items.

正因为我们可以,启用完整的全局优化

样本输出:MAX_THREADS = 32,MAX_ITEMS = 4194304

0x109c58000 : processed 260000 items.
0x109634000 : processed 263433 items.
0x10973a000 : processed 262125 items.
0x10921c000 : processed 261201 items.
0x108d81000 : processed 262325 items.
0x109a4c000 : processed 262318 items.
0x108f8d000 : processed 263107 items.
0x109010000 : processed 261382 items.
0x109946000 : processed 262299 items.
0x109199000 : processed 261930 items.
0x10929f000 : processed 263506 items.
0x109093000 : processed 262362 items.
0x108e87000 : processed 262069 items.
0x108e04000 : processed 261890 items.
0x109acf000 : processed 261875 items.
0x1097bd000 : processed 262040 items.
0x109840000 : processed 261686 items.
0x1093a5000 : processed 262547 items.
0x109b52000 : processed 261980 items.
0x109428000 : processed 264259 items.
0x108f0a000 : processed 261620 items.
0x1095b1000 : processed 263062 items.
0x1094ab000 : processed 261811 items.
0x1099c9000 : processed 262709 items.
0x109116000 : processed 261628 items.
0x109bd5000 : processed 260905 items.
0x10952e000 : processed 262741 items.
0x1098c3000 : processed 260608 items.
0x109322000 : processed 261970 items.
0x1000b8000 : processed 262061 items.
0x100781000 : processed 262669 items.
0x1096b7000 : processed 262490 items.

嗯。我没有使用volatile任何这些。一定是时候买彩票了。

无论如何,我建议对 pthread 进行一些研究,尤其是互斥体和条件变量控制及其交互。我希望这能够帮到你。

于 2013-09-13T18:13:06.157 回答