我正在用多线程编写生产者-消费者问题的变体。我正在尝试使用队列来存储“生产”的项目,直到它们稍后被“消费”。我的问题是,当消费者线程运行时,它只处理添加到队列中的最新项目(而不是队列中最旧的项目)。此外,它会重复处理该项目(直到队列本身的项目数量)。
我认为我的问题可能是当我将一个项目推入队列时我需要分配一些内存(虽然不确定)。但是,当该项目即将被消耗时,我需要一种方法来引用此内存。
无论如何,这是我的程序的配对版本。我意识到我在这里发布的内容是不完整的(这是一个无限循环),但我只是想展示与这个问题相关的部分。queue_push() 和 queue_pop() 函数都经过了很好的测试,所以我认为问题不在于那里。如果需要,我会发布更多。
谁能看到为什么我的消费者线程只处理最新的队列项?谢谢!
sem_t mutex;
queue q;
FILE* inputFPtr[10];
char host_in[BUFFERSIZE];
char host_out[BUFFERSIZE];
void* p(void* inputFile) {
while (fscanf(inputFile, INPUTFS, host_in) > 0)
{
sem_wait(&mutex);
queue_push(&q, host_in); //this function pushes the hostname onto the back of the queue
fprintf(stdout, "Produced: %d) %s\n", i, host_in);
sem_post(&mutex);
}
fclose (inputFile);
}
void* c() {
while (TRUE)
{
sem_wait(&mutex);
sprintf(hostname_out, "%s", (char *) queue_pop(&q));
printf("%s\n", host_out);
sem_post(&mutex);
}
}
int main (int argc, char* argv[]) {
int i;
pthread_t *th_in[argc-2];
pthread_t *th_out[2];
for (i = 0; i < (argc-2); i++) {
th_in[i] = (pthread_t *) malloc(sizeof(pthread_t));
inputFPtr[i] = fopen(argv[i+1], "r");
pthread_create (th_in[i], NULL, p, inputFPtr[i]);
}
for (i = 0; i < 2; i++) {
th_out[i] = (pthread_t *) malloc(sizeof(pthread_t));
pthread_create (th_out[i], NULL, c, null);
}
for (i = 0; i < (argc - 2); i++) {
pthread_join(*th_in[i], 0);
free(th_in[i]);
}
for (i = 0; i < (2); i++) {
pthread_join(*th_out[i], 0);
free(th_out[i]);
}
return EXIT_SUCCESS;
}