2

我正在用 python 开发一个多线程应用程序。我有以下情况。

  1. 有 2-3 个生产者线程与 DB 通信并以大块的形式获取一些数据并将它们填充到队列中
  2. 有一个中间工作者,它将生产者线程获取的大块分成较小的块,并将它们填充到另一个队列中。
  3. 有 5 个消费者线程使用中间工作线程创建的队列。
  4. 生产者线程通过其 API 访问数据源的对象。这些数据源是完全独立的。所以这些生产者只了解数据源对象应该给出的数据的存在或不存在。
  5. 我创建了这三种类型的线程,并通过在它们上调用 join() 使主线程等待这些线程的完成。

现在对于这样的设置,我想要一个通用的错误处理程序,它可以感知任何线程的失败、任何异常并决定要做什么。例如,如果我在启动应用程序后按 ctrl+c,主线程会死掉,但生产者、消费者线程继续运行。我希望一旦按下 ctrl+c 整个应用程序就应该关闭。同样,如果数据源模块中发生某些 DB 错误,则应通知生产者线程。

这是我到目前为止所做的:

我创建了一个类 ThreadManager,它的对象被传递给所有线程。我编写了一个错误处理程序方法并将其传递给sys.excepthook. 这个处理程序应该捕获异常、错误,然后它应该调用 ThreadManager 类的方法来控制正在运行的线程。这是片段:

class Producer(threading.Thread):
    ....
    def produce():
        data = dataSource.getData()

class DataSource:
    ....
    def getData():
        raise Exception("critical")

def customHandler(exceptionType, value, stackTrace):
     print "In custom handler"

sys.excepthook = customHandler

现在当生产者类的线程调用 DataSource 类的 getData() 时,会抛出异常。但是这个异常永远不会被我的 customHandler 方法捕获。

我错过了什么?同样在这种情况下,我还可以应用哪些其他策略?请帮忙。感谢您有足够的耐心阅读所有这些:)

4

2 回答 2

1

你需要的是一个装饰器。本质上,您正在修改原始函数并放入 try-except :

def exception_decorator(func):
    def _function(*args):
        try:
            result = func(*args)
        except:
            print('*** ESC default handler ***')
            os._exit(1)
        return result
    return _function

如果您的线程函数称为 myfunc,则在函数定义上方添加以下行

@exception_decorator
def myfunc():
    pass;
于 2013-07-19T19:11:27.353 回答
0

您不能在按 Ctrl+C 时捕获“KeyboardInterrupt”并执行以下操作:

for thread in threading.enumerate():
    thread._Thread__stop()
    thread._Thread__delete()
while len(threading.enumerate()) > 1:
    time.sleep(1)
os._exit(0)

并且在每个线程类中都有一个标志 self.alive 理论上你可以调用 thread.alive = False 并让它优雅地停止?

for thread in threading.enumerate():
    thread.alive = False
    time.sleep(5) # Grace period
    thread._Thread__stop()
    thread._Thread__delete()
while len(threading.enumerate()) > 1:
    time.sleep(1)
os._exit(0)

例子:

import os
from threading import *
from time import sleep

class worker(Thread):
    def __init__(self):
        self.alive = True
        Thread.__init__(self)
        self.start()
    def run(self):
        while self.alive:
            sleep(0.1)

runner = worker()

try:
    raw_input('Press ctrl+c!')
except:
    pass
for thread in enumerate():
    thread.alive = False
    sleep(1)
    try:
        thread._Thread__stop()
        thread._Thread__delete()
    except:
        pass
# There will always be 1 thread alive and that's the __main__ thread.
while len(enumerate()) > 1:
    sleep(1)
os._exit(0)

尝试通过更改内部系统异常处理程序来解决它?

import sys
origExcepthook = sys.excepthook
def uberexcept(exctype, value, traceback):
    if exctype == KeyboardInterrupt:
        print "Gracefully shutting down all the threads"
        # enumerate() thingie here.
    else:
        origExcepthook(exctype, value, traceback)
sys.excepthook = uberexcept
于 2012-12-03T10:38:36.987 回答