68

我有一个带有“开始”按钮和进度条的小型 GUI 测试。期望的行为是:

  • 点击开始
  • 进度条振荡 5 秒
  • 进度条停止

观察到的行为是“开始”按钮冻结 5 秒,然后显示进度条(无振荡)。

到目前为止,这是我的代码:

class GUI:
    def __init__(self, master):
        self.master = master
        self.test_button = Button(self.master, command=self.tb_click)
        self.test_button.configure(
            text="Start", background="Grey",
            padx=50
            )
        self.test_button.pack(side=TOP)

    def progress(self):
        self.prog_bar = ttk.Progressbar(
            self.master, orient="horizontal",
            length=200, mode="indeterminate"
            )
        self.prog_bar.pack(side=TOP)

    def tb_click(self):
        self.progress()
        self.prog_bar.start()
        # Simulate long running process
        t = threading.Thread(target=time.sleep, args=(5,))
        t.start()
        t.join()
        self.prog_bar.stop()

root = Tk()
root.title("Test Button")
main_ui = GUI(root)
root.mainloop()

根据 Bryan Oakley here提供的信息,我知道我需要使用线程。我尝试创建一个线程,但我猜测由于线程是从主线程中启动的,所以它没有帮助。

我的想法是将逻辑部分放在不同的类中,并从该类中实例化 GUI,类似于 A. Rodas此处的示例代码。

我的问题:

我不知道如何对其进行编码,以便此命令:

self.test_button = Button(self.master, command=self.tb_click)

调用位于另一个类中的函数。这是一件坏事还是有可能?我将如何创建可以处理 self.tb_click 的第二类?我尝试遵循 A. Rodas 的示例代码,该代码运行良好。但是我无法弄清楚如何在触发动作的 Button 小部件的情况下实现他的解决方案。

如果我应该从单个 GUI 类中处理线程,如何创建一个不干扰主线程的线程?

4

4 回答 4

70

当您在主线程中加入新线程时,它将等待线程完成,因此即使您使用多线程,GUI 也会阻塞。

如果要将逻辑部分放在不同的类中,可以直接子类化Thread,然后在按下按钮时启动该类的新对象。这个 Thread 子类的构造函数可以接收一个 Queue 对象,然后您将能够与 GUI 部分进行通信。所以我的建议是:

  1. 在主线程中创建一个 Queue 对象
  2. 创建一个可以访问该队列的新线程
  3. 定期检查主线程中的队列

然后你必须解决如果用户点击两次同一个按钮会发生什么的问题(每次点击都会产生一个新线程),但你可以通过禁用开始按钮并在调用后再次启用它来修复它self.prog_bar.stop()

import queue

class GUI:
    # ...

    def tb_click(self):
        self.progress()
        self.prog_bar.start()
        self.queue = queue.Queue()
        ThreadedTask(self.queue).start()
        self.master.after(100, self.process_queue)

    def process_queue(self):
        try:
            msg = self.queue.get_nowait()
            # Show result of the task if needed
            self.prog_bar.stop()
        except queue.Empty:
            self.master.after(100, self.process_queue)

class ThreadedTask(threading.Thread):
    def __init__(self, queue):
        super().__init__()
        self.queue = queue
    def run(self):
        time.sleep(5)  # Simulate long running process
        self.queue.put("Task finished")
于 2013-05-25T08:17:41.277 回答
6

我将提交替代解决方案的基础。它本身并不特定于 Tk 进度条,但它当然可以很容易地实现。

这里有一些类允许您在 Tk 的后台运行其他任务,在需要时更新 Tk 控件,并且不会锁定 gui!

这是 TkRepeatingTask 和 BackgroundTask 类:

import threading

class TkRepeatingTask():

    def __init__( self, tkRoot, taskFuncPointer, freqencyMillis ):
        self.__tk_   = tkRoot
        self.__func_ = taskFuncPointer        
        self.__freq_ = freqencyMillis
        self.__isRunning_ = False

    def isRunning( self ) : return self.__isRunning_ 

    def start( self ) : 
        self.__isRunning_ = True
        self.__onTimer()

    def stop( self ) : self.__isRunning_ = False

    def __onTimer( self ): 
        if self.__isRunning_ :
            self.__func_() 
            self.__tk_.after( self.__freq_, self.__onTimer )

class BackgroundTask():

    def __init__( self, taskFuncPointer ):
        self.__taskFuncPointer_ = taskFuncPointer
        self.__workerThread_ = None
        self.__isRunning_ = False

    def taskFuncPointer( self ) : return self.__taskFuncPointer_

    def isRunning( self ) : 
        return self.__isRunning_ and self.__workerThread_.isAlive()

    def start( self ): 
        if not self.__isRunning_ :
            self.__isRunning_ = True
            self.__workerThread_ = self.WorkerThread( self )
            self.__workerThread_.start()

    def stop( self ) : self.__isRunning_ = False

    class WorkerThread( threading.Thread ):
        def __init__( self, bgTask ):      
            threading.Thread.__init__( self )
            self.__bgTask_ = bgTask

        def run( self ):
            try :
                self.__bgTask_.taskFuncPointer()( self.__bgTask_.isRunning )
            except Exception as e: print repr(e)
            self.__bgTask_.stop()

这是一个 Tk 测试,它演示了这些的使用。如果您想查看实际演示,只需将其附加到包含这些类的模块底部即可:

def tkThreadingTest():

    from tkinter import Tk, Label, Button, StringVar
    from time import sleep

    class UnitTestGUI:

        def __init__( self, master ):
            self.master = master
            master.title( "Threading Test" )

            self.testButton = Button( 
                self.master, text="Blocking", command=self.myLongProcess )
            self.testButton.pack()

            self.threadedButton = Button( 
                self.master, text="Threaded", command=self.onThreadedClicked )
            self.threadedButton.pack()

            self.cancelButton = Button( 
                self.master, text="Stop", command=self.onStopClicked )
            self.cancelButton.pack()

            self.statusLabelVar = StringVar()
            self.statusLabel = Label( master, textvariable=self.statusLabelVar )
            self.statusLabel.pack()

            self.clickMeButton = Button( 
                self.master, text="Click Me", command=self.onClickMeClicked )
            self.clickMeButton.pack()

            self.clickCountLabelVar = StringVar()            
            self.clickCountLabel = Label( master,  textvariable=self.clickCountLabelVar )
            self.clickCountLabel.pack()

            self.threadedButton = Button( 
                self.master, text="Timer", command=self.onTimerClicked )
            self.threadedButton.pack()

            self.timerCountLabelVar = StringVar()            
            self.timerCountLabel = Label( master,  textvariable=self.timerCountLabelVar )
            self.timerCountLabel.pack()

            self.timerCounter_=0

            self.clickCounter_=0

            self.bgTask = BackgroundTask( self.myLongProcess )

            self.timer = TkRepeatingTask( self.master, self.onTimer, 1 )

        def close( self ) :
            print "close"
            try: self.bgTask.stop()
            except: pass
            try: self.timer.stop()
            except: pass            
            self.master.quit()

        def onThreadedClicked( self ):
            print "onThreadedClicked"
            try: self.bgTask.start()
            except: pass

        def onTimerClicked( self ) :
            print "onTimerClicked"
            self.timer.start()

        def onStopClicked( self ) :
            print "onStopClicked"
            try: self.bgTask.stop()
            except: pass
            try: self.timer.stop()
            except: pass                        

        def onClickMeClicked( self ):
            print "onClickMeClicked"
            self.clickCounter_+=1
            self.clickCountLabelVar.set( str(self.clickCounter_) )

        def onTimer( self ) :
            print "onTimer"
            self.timerCounter_+=1
            self.timerCountLabelVar.set( str(self.timerCounter_) )

        def myLongProcess( self, isRunningFunc=None ) :
            print "starting myLongProcess"
            for i in range( 1, 10 ):
                try:
                    if not isRunningFunc() :
                        self.onMyLongProcessUpdate( "Stopped!" )
                        return
                except : pass   
                self.onMyLongProcessUpdate( i )
                sleep( 1.5 ) # simulate doing work
            self.onMyLongProcessUpdate( "Done!" )                

        def onMyLongProcessUpdate( self, status ) :
            print "Process Update: %s" % (status,)
            self.statusLabelVar.set( str(status) )

    root = Tk()    
    gui = UnitTestGUI( root )
    root.protocol( "WM_DELETE_WINDOW", gui.close )
    root.mainloop()

if __name__ == "__main__": 
    tkThreadingTest()

关于BackgroundTask,我将强调两个重点:

1)您在后台任务中运行的函数需要一个函数指针,它将调用和尊重,这允许任务在中途取消 - 如果可能的话。

2)您需要确保退出应用程序时后台任务已停止。如果您不解决该问题,即使您的 gui 已关闭,该线程仍将运行!

于 2017-01-29T22:29:53.353 回答
5

问题是 t.join() 阻塞了点击事件,主线程没有回到事件循环来处理重绘。请参阅为什么 ttk Progressbar 在 Tkinter 中的进程后出现TTK 进度条在发送电子邮件时被阻止

于 2013-05-25T01:32:41.803 回答
4

我使用了 RxPY,它有一些不错的线程函数,以相当干净的方式解决了这个问题。没有队列,并且我提供了一个在后台线程完成后在主线程上运行的函数。这是一个工作示例:

import rx
from rx.scheduler import ThreadPoolScheduler
import time
import tkinter as tk

class UI:
   def __init__(self):
      self.root = tk.Tk()
      self.pool_scheduler = ThreadPoolScheduler(1) # thread pool with 1 worker thread
      self.button = tk.Button(text="Do Task", command=self.do_task).pack()

   def do_task(self):
      rx.empty().subscribe(
         on_completed=self.long_running_task, 
         scheduler=self.pool_scheduler
      )

   def long_running_task(self):
      # your long running task here... eg:
      time.sleep(3)
      # if you want a callback on the main thread:
      self.root.after(5, self.on_task_complete)

   def on_task_complete(self):
       pass # runs on main thread

if __name__ == "__main__":
    ui = UI()
    ui.root.mainloop()

使用这种结构的另一种方法可能更清洁(取决于偏好):

tk.Button(text="Do Task", command=self.button_clicked).pack()

...

def button_clicked(self):

   def do_task(_):
      time.sleep(3) # runs on background thread
             
   def on_task_done():
      pass # runs on main thread

   rx.just(1).subscribe(
      on_next=do_task, 
      on_completed=lambda: self.root.after(5, on_task_done), 
      scheduler=self.pool_scheduler
   )
于 2020-09-02T03:56:57.277 回答