我正在编写一个生产者-消费者应用程序。生产者线程启动正常,但是当我尝试启动消费者线程时,出现异常。以下是相关代码:
#Producer threads
for i in range(nThreads):
self.producerThreads.append(MailThread(i, settings.MAX_EMAILS_PERPASS, self.toFetchQueue, self.rawEmailsQueue, self.stopEvent))
self.producerThreads[i].start()
logging.info('Started producer thread %d', i)
#Consumer threads
#for i in range(settings.MAX_CONS_THREADS):
try:
self.consumerThreads.append(ProcessThread(i, settings.STORE_DIRECTORY, settings.DELETE_ONPIPE, self.rawEmailsQueue, self.stopEvent))
self.consumerThreads[i].start()
logging.info('Started consumer thread %d', i)
except Exception, e:
logging.error('Failed to start consumer thread %s', str(e))
这是消费类:
import logging, commands, threading, uuid, os, settings, Queue
class ProcessThread(threading.Thread):
"""
Class to process the emails.
"""
def __init__(self, threadCount, storeDirectory, deleteOnPipe, rawEmailsQueue, stopEvent):
self.threadCount = threadCount
self.rawEmailsQueue = rawEmailsQueue
self.stopEvent = stopEvent
self.storeDirectory = storeDirectory
self.deleteOnPipe = deleteOnPipe
threading.Thread.__init__(self)
def run(self):
logging.info('Run process for consumer thread %d', self.threadCount)
while not self.stopEvent.is_set():
try:
emailContainer = rawEmailsQueue.get(False)
logging.debug('Got a new email')
except Queue.Empty:
logging.debug('No emails in queue, going to sleep for a while')
sleep(0.1)
continue
#剩下的处理代码
我无法获得正确的缩进,这在我的代码中很好