3

这已经讨论了很多很多次了,但我仍然没有很好地掌握如何最好地实现这一点。

假设我有两个线程:一个主应用线程和一个工作线程。主应用程序线程(比如说它是一个 WXWidgets GUI 线程,或者一个正在循环并在控制台上接受用户输入的线程)可能有理由停止工作线程 - 用户正在关闭应用程序,单击停止按钮,一些错误发生在主线程中,无论如何。

通常建议是设置一个标志,线程经常检查以确定是否退出。但是,我对解决此问题的建议方法有两个问题:

首先,在我的代码中不断地检查标志会使我的代码非常难看,而且由于大量的代码重复,它非常非常容易出现问题。举个例子:

def WorkerThread():

    while (True):
        doOp1() # assume this takes say 100ms.
        if (exitThread == True): 
            safelyEnd()
            return
        doOp2() # this one also takes some time, say 200ms
        if (exitThread == True): 
            safelyEnd()
            return
        if (somethingIsTrue == True):
            doSomethingImportant()
            if (exitThread == True): return
            doSomethingElse()
            if (exitThread == True): return 
        doOp3() # this blocks for an indeterminate amount of time - say, it's waiting on a network respond
        if (exitThread == True): 
            safelyEnd()
            return
        doOp4() # this is doing some math
        if (exitThread == True): 
            safelyEnd()
            return
        doOp5() # This calls a buggy library that might block forever.  We need a way to detect this and kill this thread if it's stuck for long enough...
        saveSomethingToDisk() # might block while the disk spins up, or while a network share is accessed...whatever
        if (exitThread == True): 
            safelyEnd()
            return


def safelyEnd():
    cleanupAnyUnfinishedBusiness() # do whatever is needed to get things to a workable state even if something was interrupted
    writeWhatWeHaveToDisk() # it's OK to wait for this since it's so important

如果我添加更多代码或更改代码,我必须确保我在所有地方都添加了这些检查块。如果我的工作线程是一个非常长的线程,我可以轻松地进行数十甚至数百次这样的检查。很麻烦。

想想其他的问题。如果 doOp4() 确实意外死锁,我的应用程序将永远旋转并且永远不会退出。用户体验不好!

使用守护线程也不是一个好的选择,因为它剥夺了我执行safelyEnd()代码的机会。这段代码可能很重要——刷新磁盘缓冲区、为调试目的写入日志数据等。

其次,我的代码可能会调用阻止我没有机会经常检查的函数。假设这个函数存在,但它在我无权访问的代码中 - 比如说库的一部分:

def doOp4():
    time.sleep(60) # imagine that this is a network thread, that waits for 60 seconds for a reply before returning.

如果该超时为 60 秒,即使我的主线程发出结束线程的信号,它仍然可能会在那里停留 60 秒,此时停止等待网络响应并退出是完全合理的。但是,如果该代码是我没有编写的库的一部分,那么我无法控制它是如何工作的。

即使我确实为网络检查编写了代码,我基本上也必须对其进行重构,这样它就不会等待 60 秒,而是循环 60 次并在检查退出线程之前等待 1 秒!再次,非常混乱!

所有这一切的结果是,感觉能够轻松实现这一点的好方法是以某种方式导致特定线程上的异常。如果我能做到这一点,我可以将整个工作线程的代码包装在一个 try 块中,并将safelyEnd()代码放在异常处理程序中,甚至是一个finally块中。

有没有办法做到这一点,或者用不同的技术重构这段代码,让事情顺利进行?理想情况下,当用户请求退出时,我们希望让他们等待尽可能少的时间。似乎必须有一种简单的方法来实现这一点,因为这在应用程序中是很常见的事情!

大多数线程通信对象不允许这种类型的设置。他们可能允许以一种更简洁的方式来设置退出标志,但它仍然不能消除不断检查退出标志的需要,并且它仍然不会处理由于外部调用或因为它只是在一个繁忙的循环。

对我来说最重要的是,如果我有一个很长的工作线程过程,我必须用数百次检查标志来乱扔它。这看起来太乱了,感觉不是很好的编码实践。一定有更好的方法...

任何建议将不胜感激。

4

2 回答 2

3

首先,您可以通过使用异常来减少冗长和重复,而无需从外部将异常引发到线程中的能力,或任何其他新技巧或语言特性:

def WorkerThread():
    class ExitThreadError(Exception):
        pass
    def CheckEnd():
        if exitThread:
            raise ExitThreadError()

    try:
        while True:
            doOp1() # assume this takes say 100ms.
            CheckEnd()
            doOp2() # this one also takes some time, say 200ms
            CheckEnd()
            # etc.
    except ExitThreadError:
        safelyEnd()

请注意,你真的应该exitThreadLockor来保护Condition——这是结束检查的另一个好理由,所以你只需要在一个地方修复它。

无论如何,我已经去掉了一些过多的括号、== True检查等,这些对代码没有任何添加;希望您仍然可以看到它与原始版本的等效性。


你可以通过将你的函数重组为一个简单的状态机来更进一步;那么你甚至不需要例外。我将展示一个可笑的微不足道的例子,不管怎样,每个状态总是隐式地转换到下​​一个状态。对于这种情况,重构显然是合理的;对于您的真实代码是否合理,只有您才能真正判断。

def WorkerThread():
    states = (doOp1, doOp2, doOp3, doOp4, doOp5)
    current = 0
    while not exitThread:
        states[current]()
        current += 1
    safelyEnd()

这些都不能帮助你在你的一个步骤中间打断。

如果你有一个需要 60 秒的函数,而你对此无能为力,那么在这 60 秒内就无法取消你的线程,你也无能为力。就是那样子。

但通常,需要 60 秒的事情实际上是在做一些事情,比如在 a 上阻塞select你可以做一些事情——创建一个管道,将它的读取端粘贴到 中,然后在另一端写入以唤醒线程。select

或者,在你感觉很糟糕时,通常只是关闭/删除/等。函数正在等待/处理/以其他方式使用的文件或其他对象通常会保证它快速失败并出现异常。当然有时它保证了一个段错误,或者损坏的数据,或者有 50% 的机会退出和 50% 的机会永远挂起,或者……所以,即使你无法控制doOp4功能,你最好能够分析它的来源和/或白盒测试它。

如果最坏的情况变得最糟,那么是的,您必须将一个 60 秒的超时更改为 60 个 1 秒的超时。但通常不会到那个地步。


最后,如果您确实需要能够杀死线程,请不要使用线程,使用子进程。那些是可以杀死的。

只需确保您的进程始终处于可以安全杀死它的状态,或者,如果您只关心 Unix,请使用 USR 信号并在进程不处于可安全杀死状态时将其屏蔽。

但是,如果在 60 秒的调用中间终止进程是不安全的,那么这对您doOp4并没有真正的帮助,因为在这 60 秒内您仍然无法终止它。

在某些情况下,您可以让子进程安排父进程在它意外终止时进行清理,或者甚至安排在下次运行时对其进行清理(例如,考虑典型的数据库日志)。

但最终,你所要求的最终是一个矛盾:你想硬杀一个线程而不给它机会完成它正在做的事情,但你想保证它完成它正在做的事情,而你没有想要重写代码以使其成为可能。因此,您需要重新考虑您的设计,以便它需要一些并非不可能的东西。

于 2013-10-08T00:32:15.430 回答
1

如果你不介意你的代码运行慢十倍,你可以使用Thread2下面实现的类。下面的示例显示了调用新stop方法应如何在下一条字节码指令中终止线程。实现一个清理系统作为练习留给读者完成。

import threading
import sys

class StopThread(StopIteration): pass

threading.SystemExit = SystemExit, StopThread

class Thread2(threading.Thread):

    def stop(self):
        self.__stop = True

    def _bootstrap(self):
        if threading._trace_hook is not None:
            raise ValueError('Cannot run thread with tracing!')
        self.__stop = False
        sys.settrace(self.__trace)
        super()._bootstrap()

    def __trace(self, frame, event, arg):
        if self.__stop:
            raise StopThread()
        return self.__trace


class Thread3(threading.Thread):

    def _bootstrap(self, stop_thread=False):
        def stop():
            nonlocal stop_thread
            stop_thread = True
        self.stop = stop

        def tracer(*_):
            if stop_thread:
                raise StopThread()
            return tracer
        sys.settrace(tracer)
        super()._bootstrap()

################################################################################

import time

def main():
    test = Thread2(target=printer)
    test.start()
    time.sleep(1)
    test.stop()
    test.join()

def printer():
    while True:
        print(time.time() % 1)
        time.sleep(0.1)

if __name__ == '__main__':
    main()

该类Thread3的代码运行速度似乎比Thread2该类快大约 33%。

于 2014-09-05T14:59:09.663 回答