1

我有一个小线程池示例供学习使用:

它采用代表两个值的简单计算(+、-、*、/)的工作结构。现在工作线程将从队列中拉出这些工作结构并执行它们。

理论就这么多了。现在的问题是,当我想以某种方式使用辅助函数 submit_work() 时,工作线程中的队列解析不起作用(提取的数据无效,当我使用 QUEUE_REMOVE 时,它会出现分段错误)。

怎么可能仅仅使用一个辅助函数,整个概念就不再起作用了?

池.c

我的意思是手动推送数据工作正常...

#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
#include "../queue/queue.h"

#define MAX_THREADS 1

pthread_t threads[MAX_THREADS];

pthread_cond_t cond;   

pthread_mutex_t mutex;

/**
 * QUEUE is a "void * arr[2]".
 * The Queue itself is similar to the circularly linked list in linux
 */
QUEUE queue;

struct work_s {
    int a;
    int b;
    int type;
    QUEUE node;
};

void * worker();
void submit_work(int a, int b, int type);

int main() {
    QUEUE_INIT(&queue);

    pthread_cond_init(&cond, NULL);
    pthread_mutex_init(&mutex, NULL);

    struct work_s work[2];

    /* 5 + 7 */
    work[0].a = 5;
    work[0].b = 7;
    work[0].type = 1;

    /* 3 x 3 */
    work[1].a = 3;
    work[1].b = 3;
    work[1].type = 3;

    /* initialize there queue nodes */
    QUEUE_INIT(&work[0].node);
    QUEUE_INIT(&work[1].node);

    /* insert both tasks into the work queue */
    QUEUE_INSERT_TAIL(&queue, &work[0].node);
    QUEUE_INSERT_TAIL(&queue, &work[1].node);

    /* this does actually the same as above but causes a segmentation fault. */
    submit_work(5, 6, 3);

    for (int i = 0; i < MAX_THREADS; i++)
        pthread_create(&threads[i], NULL, worker, NULL);
    for (int i = 0; i < MAX_THREADS; i++)
        pthread_join(threads[i], NULL);
    for (int i = 0; i < MAX_THREADS; i++)
        pthread_detach(threads[i]);

    pthread_mutex_destroy(&mutex);
    pthread_cond_destroy(&cond);

    return 0;
}

void submit_work(int a, int b, int type) {
    struct work_s work;

    work.a = a;
    work.b = b;
    work.type = type;

    pthread_mutex_lock(&mutex);

    QUEUE_INIT(&work.node);
    QUEUE_INSERT_TAIL(&queue, &work.node);

    pthread_mutex_unlock(&mutex);

    pthread_cond_signal(&cond);
}

void * worker() { 
    /* a pointer to a queue node */
    QUEUE * q;

    int result;

    struct work_s * work;

    /* infinite loop */
    for (;;) {
        while (QUEUE_EMPTY(&queue)) {
            pthread_cond_wait(&cond, &mutex);
        }

        pthread_mutex_lock(&mutex);

        q = QUEUE_HEAD(&queue);

        /* HERE THE SEGMENTSTION FAULT OCCURS when using submit_work */
        QUEUE_REMOVE(q);

        pthread_mutex_unlock(&mutex);

        /* set the work pointer to the work struct we have pulled from queue */
        work = QUEUE_DATA(q, struct work_s, node);

        /* PRINTS INCORRECT DATA on submit_work() */
        printf("received work type %d with a %d and b %d \n", work->a, work->b, work->type);

        if (work->type == 0) {
            break;
        }

        switch (work->type) {
            case 1:
                result = work->a + work->b;
                printf("%d + %d = %d\n", work->a, work->b, result);
                break;
            case 2:
                result = work->a - work->b;
                printf("%d - %d = %d\n", work->a, work->b, result);
                break;
            case 3:
                result = work->a * work->b;
                printf("%d * %d = %d\n", work->a, work->b, result);
                break;
            case 4:
                result = work->a / work->b;
                printf("%d / %d = %d\n", work->a, work->b, result);
                break;
        }
    }


    pthread_exit(NULL);
}

队列.h

/* Copyright (c) 2013, Ben Noordhuis <info@bnoordhuis.nl>
 *
 * Permission to use, copy, modify, and/or distribute this software for any
 * purpose with or without fee is hereby granted, provided that the above
 * copyright notice and this permission notice appear in all copies.
 *
 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
 */

#ifndef QUEUE_H_
#define QUEUE_H_

typedef void *QUEUE[2];

/* Private macros. */
#define QUEUE_NEXT(q)       ((*(q))[0])
#define QUEUE_PREV(q)       ((*(q))[1])
#define QUEUE_PREV_NEXT(q)  (QUEUE_NEXT((QUEUE *) QUEUE_PREV(q)))
#define QUEUE_NEXT_PREV(q)  (QUEUE_PREV((QUEUE *) QUEUE_NEXT(q)))

/* Public macros. */
#define QUEUE_DATA(ptr, type, field)                                          \
  ((type *) ((char *) (ptr) - ((long) &((type *) 0)->field)))

#define QUEUE_FOREACH(q, h)                                                   \
  for ((q) = (*(h))[0]; (q) != (h); (q) = (*(q))[0])

#define QUEUE_EMPTY(q)                                                        \
  (QUEUE_NEXT(q) == (q))

#define QUEUE_HEAD(q)                                                         \
  (QUEUE_NEXT(q))

#define QUEUE_INIT(q)                                                         \
  do {                                                                        \
    QUEUE_NEXT(q) = (q);                                                      \
    QUEUE_PREV(q) = (q);                                                      \
  }                                                                           \
  while (0)

#define QUEUE_ADD(h, n)                                                       \
  do {                                                                        \
    QUEUE_PREV_NEXT(h) = QUEUE_NEXT(n);                                       \
    QUEUE_NEXT_PREV(n) = QUEUE_PREV(h);                                       \
    QUEUE_PREV(h) = QUEUE_PREV(n);                                            \
    QUEUE_PREV_NEXT(h) = (h);                                                 \
  }                                                                           \
  while (0)

#define QUEUE_SPLIT(h, q, n)                                                  \
  do {                                                                        \
    QUEUE_PREV(n) = QUEUE_PREV(h);                                            \
    QUEUE_PREV_NEXT(n) = (n);                                                 \
    QUEUE_NEXT(n) = (q);                                                      \
    QUEUE_PREV(h) = QUEUE_PREV(q);                                            \
    QUEUE_PREV_NEXT(h) = (h);                                                 \
    QUEUE_PREV(q) = (n);                                                      \
  }                                                                           \
  while (0)

#define QUEUE_INSERT_HEAD(h, q)                                               \
  do {                                                                        \
    QUEUE_NEXT(q) = QUEUE_NEXT(h);                                            \
    QUEUE_PREV(q) = (h);                                                      \
    QUEUE_NEXT_PREV(q) = (q);                                                 \
    QUEUE_NEXT(h) = (q);                                                      \
  }                                                                           \
  while (0)

#define QUEUE_INSERT_TAIL(h, q)                                               \
  do {                                                                        \
    QUEUE_NEXT(q) = (h);                                                      \
    QUEUE_PREV(q) = QUEUE_PREV(h);                                            \
    QUEUE_PREV_NEXT(q) = (q);                                                 \
    QUEUE_PREV(h) = (q);                                                      \
  }                                                                           \
  while (0)

#define QUEUE_REMOVE(q)                                                       \
  do {                                                                        \
    QUEUE_PREV_NEXT(q) = QUEUE_NEXT(q);                                       \
    QUEUE_NEXT_PREV(q) = QUEUE_PREV(q);                                       \
  }                                                                           \
  while (0)

#endif /* QUEUE_H_ */

如果我还应该复制 QUEUE.h 文件,请发表评论。否则你在这里找到它

博多

4

2 回答 2

3

问题是您正在submit_work()向队列中插入一个局部变量(队列例程不会复制它们正在排队的对象)。因此,一旦submit_work()返回,排队的结构中的数据就不再有效(并且可能会将链接指针更改为垃圾)。

这不会发生在前两项入队的情况下,因为它们的生命周期在线程函数执行时仍然有效,因为调用main()中的块pthread_join()

您应该对动态分配的项目进行排队,以便它们的生命周期超过将它们排队的函数的生命周期。当然,您还需要具有使项目出队释放内存的功能。当然,这意味着您还应该动态分配由main().

于 2013-06-03T14:26:59.983 回答
2

worker()您在不持有互斥锁的情况下执行此循环:

while (QUEUE_EMPTY(&queue)) {
    pthread_cond_wait(&cond, &mutex);
}

pthread_cond_wait()只能在持有您传递给它的互斥锁时调用 - 它也会在该互斥锁锁定的情况下返回,因此您不应该pthread_mutex_lock(&mutex)在该循环之后立即调用 - 它已经被锁定。

所以那段代码应该看起来像:

pthread_mutex_lock(&mutex);
while (QUEUE_EMPTY(&queue)) {
    pthread_cond_wait(&cond, &mutex);
}

// don't call pthread_mutex_lock(&mutex) here...
于 2013-06-02T09:20:37.330 回答