0

我正在编写一个生产者-消费者应用程序。生产者线程启动正常,但是当我尝试启动消费者线程时,出现异常。以下是相关代码:

#Producer threads
    for i in range(nThreads):
        self.producerThreads.append(MailThread(i, settings.MAX_EMAILS_PERPASS, self.toFetchQueue, self.rawEmailsQueue, self.stopEvent))
        self.producerThreads[i].start()
        logging.info('Started producer thread %d', i)

    #Consumer threads
    #for i in range(settings.MAX_CONS_THREADS):
    try:
        self.consumerThreads.append(ProcessThread(i, settings.STORE_DIRECTORY, settings.DELETE_ONPIPE, self.rawEmailsQueue, self.stopEvent))
        self.consumerThreads[i].start()
        logging.info('Started consumer thread %d', i)
    except Exception, e:
        logging.error('Failed to start consumer thread %s', str(e))

这是消费类:

import logging, commands, threading, uuid, os, settings, Queue

class ProcessThread(threading.Thread):
"""
Class to process the emails. 
"""
def __init__(self, threadCount, storeDirectory, deleteOnPipe, rawEmailsQueue, stopEvent):
    self.threadCount = threadCount
    self.rawEmailsQueue = rawEmailsQueue
    self.stopEvent = stopEvent
    self.storeDirectory = storeDirectory
    self.deleteOnPipe = deleteOnPipe
    threading.Thread.__init__(self)

def run(self):
    logging.info('Run process for consumer thread %d', self.threadCount)

    while not self.stopEvent.is_set():
        try:
            emailContainer = rawEmailsQueue.get(False)
            logging.debug('Got a new email')

        except Queue.Empty:
            logging.debug('No emails in queue, going to sleep for a while')
            sleep(0.1)
            continue

#剩下的处理代码

我无法获得正确的缩进,这在我的代码中很好

4

1 回答 1

0

这是一个愚蠢的错误(可能是因为我经常使用 PHP 进行编码)。我已经像这样初始化了数组:

self.producerThreads = self.consumerThreads = []

两个数组都引用相同的内存。

于 2012-10-30T10:19:43.600 回答