2

我有一个工作线程处理工作项队列。我刚刚实现了第二个工作人员来处理插入的项目worker1Invalid reads但是,我在使用 Valgrind 时遇到了一些问题。

我假设这是因为struct foo我传递给worker2()在主线程中的某个点被释放。本质上struct foo是一个不断更新的结构(malloc/free),但是,我想worker2foo.

我的问题是:是否可以尽快worker2停止处理?并在何时重新开始?我不确定用线程将丢失的项目插入的最佳方法是什么?任何反馈表示赞赏。struct fooNULLcreate_foo()foo

//canonical form
//producer
void push_into_queue(char *item)
{
    pthread_mutex_lock(&queueMutex);
    if (workQueue.full) {       // full }
        else
        {
            add_item_into_queue(item);
            pthread_cond_signal(&queueSignalPush);
        }
        pthread_mutex_unlock(&queueMutex);
    }
}

// consumer1
void *worker1(void *arg)
{
    while (true) {
        pthread_mutex_lock(&queueMutex);
        while (workQueue.empty)
            pthread_cond_wait(&queueSignalPush, &queueMutex);

        item = workQueue.front; // pop from queue
        add_item_into_list(item);

        pthread_cond_broadcast(&queueSignalPop);
        pthread_mutex_unlock(&queueMutex);
    }
    return NULL;
}

pthread_create(&thread1, NULL, (void *) &worker, NULL);

// consumer2
void *worker2(void *arg)
{
    my_struct *foo = (my_struct *) arg;
    while (true) {
        pthread_mutex_lock(&queueMutex);
        while (list.empty)
            pthread_cond_wait(&queueSignalPop, &queueMutex);

        for (i = 0; i < list.size; i++)
            insert_item_into_foo(list[i].item, foo);
        pthread_cond_broadcast(&queueSignalPop);
        pthread_mutex_unlock(&queueMutex);
    }
    return NULL;
}

void create_foo()
{
    my_struct *foo = calloc(10, sizeof(my_struct));
    pthread_create(&thread2, NULL, (void *) &worker2, foo);
}

void free_foo()
{
    pthread_mutex_lock(&queueMutex);
    int i;
    for (i=0; i<5; i++)
       free(foo[i].list->string);
    free(foo[i].list);
    free(foo);
    pthread_mutex_unlock(&queueMutex);
}
4

2 回答 2

1

您没有为worker1和定义任何终止条件worker2。我想 eol offoo可以这样考虑。这意味着两个工人都必须foo通过拥有对它的引用来监视 的存在(即 a foo **)。

void *worker2(void *arg)
{
    my_struct **foo = (my_struct **) arg;
    while(true) {
        pthread_mutex_lock(&queueMutex);
        while (list.empty)
            pthread_cond_wait(&queueSignalPop, &queueMutex);
        if (NULL == *foo) 
            break;
        for (i = 0; i < list.size; i++)
            insert_item_into_foo(list[i].item, *foo);
        pthread_cond_broadcast(&queueSignalPop);
        pthread_mutex_unlock(&queueMutex);
    }
    free(foo);
    return NULL;
}

void create_foo()
{
    my_struct *foo = calloc(10, sizeof(my_struct ));
    my_struct **foo_ptr = malloc(1, sizeof(my_struct *));
    *foo_ptr = foo;
    pthread_create(&thread2, NULL, (void *) &worker2, foo_ptr);
   // more work with foo
}

请注意,foo必须以某种方式将其分配给不同的变量以便可以访问free_foo(您的代码假设这个事实没有明确显示它 - 因此我在末尾的评论create_foo)。

使用上面的代码,每个实例都worker2拥有一个在其整个生命周期中依赖的指针,并且在退出之前必须处理好它。

更新:

也许更好的解决方案是将一个thread2包含foo指针的结构传递给 ,以及一个指示该指针是否仍然有效的标志。您可以在 struct ad lib 中添加线程所需的任何其他信息。

struct th2_data {
    enum {RUNNING, TERMINATING} state;
    my_struct *foo;
};

然后分配该结构的一个实例,将其初始化为{RUNNING, foo},并将其传递给thread2。在某处保留其地址的副本,以便能够TERMINATINGthread2. 实际上,正如您在评论中所问的那样,您必须将if (NULL == *foo)thread2 中的测试替换为if (foo.state == TERMINATING).

于 2013-05-15T09:36:46.793 回答
1

使 foo 全局并在循环中添加一些指针检查。
下次调用 create_foo 时,它会重新启动线程。

my_struct *foo = NULL;

//消费者2 无效*worker2(无效*arg) {

while (true) { if ( fool == NULL ) return; pthread_mutex_lock(&queueMutex); while (list.empty) pthread_cond_wait(&queueSignalPop, &queueMutex); for (i = 0; i < list.size; i++) insert_item_into_foo(list[i].item, foo); pthread_cond_broadcast(&queueSignalPop); pthread_mutex_unlock(&queueMutex); } return NULL;

}

于 2013-05-15T03:13:28.720 回答