我正在构建一个多线程应用程序。
我已经设置了一个线程池。[一个大小为 N 的队列和从队列中获取数据的 N 个工人]
完成所有任务后,我使用
tasks.join()
其中任务是队列。
应用程序似乎运行平稳,直到某个时间点(例如 20 分钟后)突然终止并出现错误
thread.error: can't start new thread
有任何想法吗?
编辑:线程是守护线程,代码如下:
while True:
t0 = time.time()
keyword_statuses = DBSession.query(KeywordStatus).filter(KeywordStatus.status==0).options(joinedload(KeywordStatus.keyword)).with_lockmode("update").limit(100)
if keyword_statuses.count() == 0:
DBSession.commit()
break
for kw_status in keyword_statuses:
kw_status.status = 1
DBSession.commit()
t0 = time.time()
w = SWorker(threads_no=32, network_server='http://192.168.1.242:8180/', keywords=keyword_statuses, cities=cities, saver=MySqlRawSave(DBSession), loglevel='debug')
w.work()
print 'finished'
守护线程何时被杀死?应用程序何时完成或 work() 何时完成?
看看线程池和工人(它来自一个食谱)
from Queue import Queue
from threading import Thread, Event, current_thread
import time
event = Event()
class Worker(Thread):
"""Thread executing tasks from a given tasks queue"""
def __init__(self, tasks):
Thread.__init__(self)
self.tasks = tasks
self.daemon = True
self.start()
def run(self):
'''Start processing tasks from the queue'''
while True:
event.wait()
#time.sleep(0.1)
try:
func, args, callback = self.tasks.get()
except Exception, e:
print str(e)
return
else:
if callback is None:
func(args)
else:
callback(func(args))
self.tasks.task_done()
class ThreadPool:
"""Pool of threads consuming tasks from a queue"""
def __init__(self, num_threads):
self.tasks = Queue(num_threads)
for _ in range(num_threads): Worker(self.tasks)
def add_task(self, func, args=None, callback=None):
''''Add a task to the queue'''
self.tasks.put((func, args, callback))
def wait_completion(self):
'''Wait for completion of all the tasks in the queue'''
self.tasks.join()
def broadcast_block_event(self):
'''blocks running threads'''
event.clear()
def broadcast_unblock_event(self):
'''unblocks running threads'''
event.set()
def get_event(self):
'''returns the event object'''
return event
也可能是因为我在循环中创建 SWorker 对象的问题?旧的 SWorker(垃圾收集?)会发生什么?