15

创建多处理/ GUI 编码系统的最佳方法是什么?

我想为互联网社区创建一个地方来寻找有关如何multiprocessing在 python 中使用该模块的示例。

multiprocessing在互联网上看到了几个在主模块中调用的简单全局函数的过程的小例子,但我发现这很少能轻易地转化为任何人实际对 GUI 所做的任何事情。我认为许多程序将具有他们想要在单独进程中使用的功能作为对象的方法(可能是其他对象的聚合等),并且可能单个 GUI 元素将具有需要调用它的关联对象过程等

例如,我有一个相对复杂的程序,我在为其获取响应式 GUI 时遇到问题,我认为这是由于我multiprocessingQThread. 但是,我知道下面给出的示例至少会以我想要的方式在进程之间传递信息(由于能够执行print语句),但我的 GUI 仍然处于锁定状态。有谁知道可能是什么原因造成的,如果我对多线程/多处理架构缺乏了解,这仍然是一个问题?

这是我正在做的一个小的伪代码示例:

class Worker:
    ...
    def processing(self, queue):
        # put stuff into queue in a loop

# This thread gets data from Worker
class Worker_thread(QThread):
    def __init__(self):
        ...
        # make process with Worker inside
    def start_processing(self):
        # continuously get data from Worker
        # send data to Tab object with signals/slots

class Tab(QTabWidget):
    # spawn a thread separate from main GUI thread

    # update GUI using slot
    def update_GUI()

这段代码是完全可编译的例子,它体现了我的程序的覆盖结构:

from PyQt4 import QtCore, QtGui
import multiprocessing as mp
import numpy as np
import sys
import time

# This object can hold several properties which will be used for the processing
# and will be run in the background, while it updates a thread with all of it's progress
class Worker:
    def __init__(self, some_var):
        self.some_var = some_var
        self.iteration = 0

    def some_complex_processing(self, queue):
        for i in range(0,5000):
            self.iteration += 1
            queue.put(self.iteration)
        queue.put('done with processing')

# This Woker_thread is a thread which will spawn a separate process (Worker).
# This separate is needed in order to separate the data retrieval
# from the main GUI thread, which should only quickly update when needed 
class Worker_thread(QtCore.QThread):
    # signals and slots are used to communicate back to the main GUI thread
    update_signal = QtCore.pyqtSignal(int)
    done_signal = QtCore.pyqtSignal()

    def __init__(self, parent, worker):
        QtCore.QThread.__init__(self, parent)
        self.queue = mp.Queue()
        self.worker = worker
        self.parent = parent
        self.process = mp.Process(target=self.worker.some_complex_processing, args=(self.queue,))

    # When the process button is pressed, this function will start getting data from Worker
    # this data is then retrieved by the queue and pushed through a signal
    # to Tab.update_GUI
    @QtCore.pyqtSlot()
    def start_computation(self):
        self.process.start()
        while(True):
            try:
                message = self.queue.get()
                self.update_signal.emit(message)
            except EOFError:
                pass
            if message == 'done with processing':
                self.done_signal.emit()
                break
            #self.parent.update_GUI(message)
        self.process.join()
        return

# Each tab will start it's own thread, which will spawn a process
class Tab(QtGui.QTabWidget):
    start_comp = QtCore.pyqtSignal()
    def __init__(self, parent, this_worker):
        self.parent = parent
        self.this_worker = this_worker
        QtGui.QTabWidget.__init__(self, parent)

        self.treeWidget = QtGui.QTreeWidget(self)
        self.properties = QtGui.QTreeWidgetItem(self.treeWidget, ["Properties"])
        self.step = QtGui.QTreeWidgetItem(self.properties, ["Iteration #"])

        self.thread = Worker_thread(parent=self, worker=self.this_worker)
        self.thread.update_signal.connect(self.update_GUI)
        self.thread.done_signal.connect(self.thread.quit)
        self.start_comp.connect(self.thread.start_computation)
        self.thread.start()

    ###############################
    # Here is what should update the GUI at every iteration of Worker.some_complex_processing()
    # The message appears to be getting sent, due to seeing the print statement in the console, but the GUI is not updated.
    @QtCore.pyqtSlot(int)
    def update_GUI(self, iteration):
        self.step.setText(0, str(iteration))
        #time.sleep(0.1)
        print iteration

    def start_signal_emit(self):
        self.start_comp.emit()

# GUI stuff
class MainWindow(QtGui.QMainWindow):
    def __init__(self, parent = None):
        QtGui.QMainWindow.__init__(self)
        self.tab_list = []
        self.setTabShape(QtGui.QTabWidget.Rounded)
        self.centralwidget = QtGui.QWidget(self)
        self.top_level_layout = QtGui.QGridLayout(self.centralwidget)

        self.tabWidget = QtGui.QTabWidget(self.centralwidget)
        self.top_level_layout.addWidget(self.tabWidget, 1, 0, 25, 25)

        process_button = QtGui.QPushButton("Process")
        self.top_level_layout.addWidget(process_button, 0, 1)
        QtCore.QObject.connect(process_button, QtCore.SIGNAL("clicked()"), self.process)

        self.setCentralWidget(self.centralwidget)
        self.centralwidget.setLayout(self.top_level_layout)

        # Make Tabs in loop from button
        for i in range(0,10):
            name = 'tab' + str(i)
            self.tab_list.append(Tab(self.tabWidget, Worker(name)))
            self.tabWidget.addTab(self.tab_list[-1], name)

    # Do the processing
    def process(self):
        for tab in self.tab_list:
            tab.start_signal_emit()
        return

if __name__ == "__main__":
    app = QtGui.QApplication([])
    win = MainWindow()
    win.show()
    sys.exit(app.exec_())

更多信息: 我正在编写一个程序,我想从中产生几个进程,并让它们在整个处理过程中不断显示它们的进度。我希望程序是多处理的,以便尽可能地从程序中获得最佳速度。

目前,我正在尝试使用线程来生成进程并使用信号和插槽来更新 GUI,同时队列不断检索数据。使用语句时似乎queuessignals和工作,但无法更新 GUI。如果有人对我应该如何构建它以使程序更易于管理有任何其他建议,我想学习。slotsprint

编辑:我已经做出了 Min Lin 提出的调整,另外还做Worker了一个QObject这样moveToThread()
这是我目前拥有的新代码:

from PyQt4 import QtCore, QtGui
import multiprocessing as mp
import numpy as np
import sys
import time

class Worker(QtCore.QObject):
    update_signal = QtCore.pyqtSignal(int)
    done_signal = QtCore.pyqtSignal()

    def __init__(self, some_var):
        QtCore.QObject.__init__(self, parent=None)
        self.some_var = some_var
        self.iteration = 0
        self.queue = mp.Queue()
        self.process = mp.Process(target=self.some_complex_processing, args=(self.queue,))

    def some_complex_processing(self, queue):
        for i in range(0,5000):
            self.iteration += 1
            queue.put(self.iteration)
        queue.put('done with processing')

    @QtCore.pyqtSlot()
    def start_computation(self):
        self.process.start()
        while(True):
            try:
                message = self.queue.get()
                self.update_signal.emit(message)
            except EOFError:
                pass
            if message == 'done with processing':
                self.done_signal.emit()
                break
        self.process.join()
        return



class Tab(QtGui.QTabWidget):
    start_comp = QtCore.pyqtSignal()
    def __init__(self, parent, this_worker):
        self.parent = parent
        self.this_worker = this_worker
        QtGui.QTabWidget.__init__(self, parent)

        self.treeWidget = QtGui.QTreeWidget(self)
        self.properties = QtGui.QTreeWidgetItem(self.treeWidget, ["Properties"])
        self.step = QtGui.QTreeWidgetItem(self.properties, ["Iteration #"])

        # Use QThread is enough
        self.thread = QtCore.QThread();
        # Change the thread affinity of worker to self.thread.
        self.this_worker.moveToThread(self.thread);
        self.this_worker.update_signal.connect(self.update_GUI)
        self.this_worker.done_signal.connect(self.thread.quit)
        self.start_comp.connect(self.this_worker.start_computation)
        self.thread.start()

    ###############################
    # Here is what should update the GUI at every iteration of Worker.some_complex_processing()
    # The message appears to be getting sent, due to seeing the print statement in the console, but the GUI is not updated.
    @QtCore.pyqtSlot(int)
    def update_GUI(self, iteration):
        self.step.setText(0, str(iteration))
        #time.sleep(0.1)
        print iteration

    def start_signal_emit(self):
        self.start_comp.emit()

# GUI stuff
class MainWindow(QtGui.QMainWindow):
    def __init__(self, parent = None):
        QtGui.QMainWindow.__init__(self)
        self.tab_list = []
        self.setTabShape(QtGui.QTabWidget.Rounded)
        self.centralwidget = QtGui.QWidget(self)
        self.top_level_layout = QtGui.QGridLayout(self.centralwidget)

        self.tabWidget = QtGui.QTabWidget(self.centralwidget)
        self.top_level_layout.addWidget(self.tabWidget, 1, 0, 25, 25)

        process_button = QtGui.QPushButton("Process")
        self.top_level_layout.addWidget(process_button, 0, 1)
        QtCore.QObject.connect(process_button, QtCore.SIGNAL("clicked()"), self.process)

        self.setCentralWidget(self.centralwidget)
        self.centralwidget.setLayout(self.top_level_layout)

        # Make Tabs in loop from button
        for i in range(0,10):
            name = 'tab' + str(i)
            self.tab_list.append(Tab(self.tabWidget, Worker(name)))
            self.tabWidget.addTab(self.tab_list[-1], name)

    # Do the processing
    def process(self):
        for tab in self.tab_list:
            tab.start_signal_emit()
        return

if __name__ == "__main__":
    app = QtGui.QApplication([])
    win = MainWindow()
    win.show()
    sys.exit(app.exec_())

感谢您提供所有答案,我很欣赏每个人在描述他们认为是解决方案的想法时所采用的详细程度,但不幸的是,我还没有能够执行对他们所属的对象进行操作的这些类型的过程在 GUI 上显示对象的属性时。
但是,我从这篇文章中学到了很多东西,这让我意识到我目前拥有的线程版本正在挂起 GUI,因为 GUI 更新功能太大并且需要太多处理。

所以,我采用了QTimer()多线程版本的方法,它的性能要好得多!我会建议任何面临类似问题的人至少尝试类似的事情。

我不知道这种解决 GUI 更新问题的方法,现在它是对我所面临问题的伪或临时修复。

4

4 回答 4

8

GUI 应用程序非常适合测试东西,因为它很容易产生新任务并可视化正在发生的事情,所以我写了一个小示例应用程序(屏幕截图,代码如下),因为我确实想自己学习它。

起初,我采取了与您类似的方法,尝试实现消费者/生产者模式,我在后台进程中苦苦挣扎,执行无限循环以等待新工作,并为自己负责来回通信。然后我发现了接口,然后我可以用几行代码替换所有可怕的代码。您只需要一个池和一些回调:

#!/usr/bin/env python3
import multiprocessing, time, random, sys
from PySide.QtCore import * # equivalent: from PyQt4.QtCore import *
from PySide.QtGui import *   # equivalent: from PyQt4.QtGui import *

def compute(num):
    print("worker() started at %d" % num)
    random_number = random.randint(1, 6)
    if random_number in (2, 4, 6):
        raise Exception('Random Exception in _%d' % num)
    time.sleep(random_number)
    return num

class MainWindow(QMainWindow):
    def __init__(self):
        QMainWindow.__init__(self)
        self.toolBar = self.addToolBar("Toolbar")
        self.toolBar.addAction(QAction('Add Task', self, triggered=self.addTask))
        self.list = QListWidget()
        self.setCentralWidget(self.list)

        # Pool of Background Processes
        self.pool = multiprocessing.Pool(processes=4)

    def addTask(self):
        num_row = self.list.count()
        self.pool.apply_async(func=compute, args=(num_row,), callback=self.receiveResult,
                              error_callback=self.receiveException)
        item = QListWidgetItem("item %d" % num_row)
        item.setForeground(Qt.gray)
        self.list.addItem(item)

    def receiveResult(self, result):
        assert isinstance(result, int)
        print("end_work(), where result is %s" % result)
        self.list.item(result).setForeground(Qt.darkGreen)

    def receiveException(self, exception):
        error = str(exception)
        _pos = error.find('_') + 1
        num_row = int(error[_pos:])
        item = self.list.item(num_row)
        item.setForeground(Qt.darkRed)
        item.setText(item.text() + ' Retry...')
        self.pool.apply_async(func=compute, args=(num_row,), callback=self.receiveResult,
                              error_callback=self.receiveException)

if __name__ == '__main__':
    app = QApplication(sys.argv)
    main_window = MainWindow()
    main_window.show()
    sys.exit(app.exec_())

编辑:我使用 QTimer 而不是回调做了另一个示例,定期检查队列中的条目,更新 QProgressBar:

#!/usr/bin/env python3
import multiprocessing, multiprocessing.pool, time, random, sys
from PySide.QtCore import *
from PySide.QtGui import *

def compute(num_row):
    print("worker started at %d" % num_row)
    random_number = random.randint(1, 10)
    for second in range(random_number):
        progress = float(second) / float(random_number) * 100
        compute.queue.put((num_row, progress,))
        time.sleep(1)
    compute.queue.put((num_row, 100))

def pool_init(queue):
    # see http://stackoverflow.com/a/3843313/852994
    compute.queue = queue

class MainWindow(QMainWindow):
    def __init__(self):
        QMainWindow.__init__(self)
        self.toolBar = self.addToolBar("Toolbar")
        self.toolBar.addAction(QAction('Add Task', self, triggered=self.addTask))
        self.table = QTableWidget()
        self.table.verticalHeader().hide()
        self.table.setColumnCount(2)
        self.setCentralWidget(self.table)

        # Pool of Background Processes
        self.queue = multiprocessing.Queue()
        self.pool = multiprocessing.Pool(processes=4, initializer=pool_init, initargs=(self.queue,))

        # Check for progress periodically
        self.timer = QTimer()
        self.timer.timeout.connect(self.updateProgress)
        self.timer.start(2000)

    def addTask(self):
        num_row = self.table.rowCount()
        self.pool.apply_async(func=compute, args=(num_row,))
        label = QLabel("Queued")
        bar = QProgressBar()
        bar.setValue(0)
        self.table.setRowCount(num_row + 1)
        self.table.setCellWidget(num_row, 0, label)
        self.table.setCellWidget(num_row, 1, bar)

    def updateProgress(self):
        if self.queue.empty(): return
        num_row, progress = self.queue.get() # unpack
        print("received progress of %s at %s" % (progress, num_row))
        label = self.table.cellWidget(num_row, 0)
        bar = self.table.cellWidget(num_row, 1)
        bar.setValue(progress)
        if progress == 100:
            label.setText('Finished')
        elif label.text() == 'Queued':
            label.setText('Downloading')
        self.updateProgress() # recursion

if __name__ == '__main__':
    app = QApplication(sys.argv)
    main_window = MainWindow()
    main_window.show()
    sys.exit(app.exec_())
于 2013-04-08T09:03:34.890 回答
4

非常感谢您发布这个问题,以及所有贡献者的意见。它为我试验 PyQt 和多处理提供了一个有用的脚手架。

我从问题中列出的第二个代码示例开始。我的更改和评论:

  • 在 Windows 上,所有参数都Process.__init__()必须是可腌制的。您将看到 @valmynd 将他/她的compute函数设为顶级函数,正是出于这个原因。这部分是因为多处理将重新导入包含目标函数的模块。为了提醒自己这一点,我尝试将目标函数放入它自己的模块中(并确保任何信息都作为可腌制参数传递)。我已将复杂的处理功能移至其自己的名为workermodule.py.
  • 如果在复杂的处理功能中没有足够的工作,则循环完成得太快,以至于无法在 GUI 中看到任何更改。因此,我在复杂的处理函数中添加了一些额外的(无用的)工作。如评论中所述,您可以time.sleep,但稍微点亮所有内核会更令人满意。
  • 通过以下两个代码片段,我得到了一个平滑的 GUI,迭代值不断更新,子流程全速运行。
  • 请注意,self.process可以使用两个队列作为参数创建,一个用于输入,一个用于输出。然后复杂的处理功能将不得不定期检查输入队列的数据。

工人模块.py:

import time

def some_complex_processing(queue):
    iteration = 0
    for i in range(0,5000):
        iteration += 1
        queue.put(iteration)
        #time.sleep(20e-3) # ms
        # You could time.sleep here to simulate a
        # long-running process, but just to prove
        # that we're not cheating, let's make this
        # process work hard, while the GUI process
        # should still have smooth drawing.
        for x in range(100000):
            y = x
    queue.put('done with processing')

主文件.py:

from PyQt4 import QtCore, QtGui
import multiprocessing as mp
import sys
import workermodule

class Worker(QtCore.QObject):
    update_signal = QtCore.pyqtSignal(int)
    done_signal = QtCore.pyqtSignal()

    def __init__(self, some_var):
        QtCore.QObject.__init__(self, parent=None)
        self.some_var = some_var
        self.queue = mp.Queue()
        self.process = mp.Process(
            target=workermodule.some_complex_processing,
            args=(self.queue,)
            )

    @QtCore.pyqtSlot()
    def start_computation(self):
        self.process.start()
        while True:
            try:
                message = self.queue.get()
                self.update_signal.emit(message)
            except EOFError:
                pass
            if message == 'done with processing':
                self.done_signal.emit()
                break
        self.process.join()
        return

class Tab(QtGui.QTabWidget):
    start_comp = QtCore.pyqtSignal()
    def __init__(self, parent, this_worker):
        self.parent = parent
        self.this_worker = this_worker
        QtGui.QTabWidget.__init__(self, parent)

        self.treeWidget = QtGui.QTreeWidget(self)
        self.properties = QtGui.QTreeWidgetItem(self.treeWidget, ["Properties"])
        self.step = QtGui.QTreeWidgetItem(self.properties, ["Iteration #"])

        # Use QThread is enough
        self.thread = QtCore.QThread();
        # Change the thread affinity of worker to self.thread.
        self.this_worker.moveToThread(self.thread);
        self.this_worker.update_signal.connect(self.update_GUI)
        self.this_worker.done_signal.connect(self.thread.quit)
        self.start_comp.connect(self.this_worker.start_computation)
        self.thread.start()

    ###############################
    # Here is what should update the GUI at every iteration of Worker.some_complex_processing()
    # The message appears to be getting sent, due to seeing the print statement in the console, but the GUI is not updated.
    @QtCore.pyqtSlot(int)
    def update_GUI(self, iteration):
        self.step.setText(0, str(iteration))
        print iteration

    def start_signal_emit(self):
        self.start_comp.emit()

# GUI stuff
class MainWindow(QtGui.QMainWindow):
    def __init__(self, parent = None):
        QtGui.QMainWindow.__init__(self)
        self.tab_list = []
        self.setTabShape(QtGui.QTabWidget.Rounded)
        self.centralwidget = QtGui.QWidget(self)
        self.top_level_layout = QtGui.QGridLayout(self.centralwidget)

        self.tabWidget = QtGui.QTabWidget(self.centralwidget)
        self.top_level_layout.addWidget(self.tabWidget, 1, 0, 25, 25)

        process_button = QtGui.QPushButton("Process")
        self.top_level_layout.addWidget(process_button, 0, 1)
        QtCore.QObject.connect(process_button, QtCore.SIGNAL("clicked()"), self.process)

        self.setCentralWidget(self.centralwidget)
        self.centralwidget.setLayout(self.top_level_layout)

        # Make Tabs in loop from button
        for i in range(0,10):
            name = 'tab' + str(i)
            self.tab_list.append(Tab(self.tabWidget, Worker(name)))
            self.tabWidget.addTab(self.tab_list[-1], name)

    # Do the processing
    def process(self):
        for tab in self.tab_list:
            tab.start_signal_emit()
        return

if __name__ == "__main__":
    app = QtGui.QApplication([])
    win = MainWindow()
    win.show()
    sys.exit(app.exec_())
于 2013-10-10T12:44:50.200 回答
3

你在 Worker_Thread 中所做的一切都应该已经移到了 Worker 中。Qt 根据创建对象的位置为每个对象分配线程亲和性。Worker_Thread 对象是在主线程中创建的,因此它具有主线程亲和性。如果来自主线程的信号连接到在主线程中创建的对象的插槽,则该插槽也将在主线程中执行。(无论是 QueuedConnection 还是 DirectConnection)。插槽挡住了 GUI。

做这个:

class Worker:
    update_signal = QtCore.pyqtSignal(int)
    done_signal = QtCore.pyqtSignal()

    def __init__(self, some_var):
        self.some_var = some_var
        self.iteration = 0
        self.queue = mp.Queue()
        self.process = mp.Process(target=self.some_complex_processing, args=(self.queue,))

    def some_complex_processing(self, queue):
        for i in range(0,5000):
            self.iteration += 1
            queue.put(self.iteration)
        queue.put('done with processing')

    @QtCore.pyqtSlot()
    def start_computation(self):
        self.process.start()
        while(True):
            try:
                message = self.queue.get()
                self.update_signal.emit(message)
            except EOFError:
                pass
            if message == 'done with processing':
                self.done_signal.emit()
                break
        self.process.join()
        return



class Tab(QtGui.QTabWidget):
    start_comp = QtCore.pyqtSignal()
    def __init__(self, parent, this_worker):
        self.parent = parent
        self.this_worker = this_worker
        QtGui.QTabWidget.__init__(self, parent)

        self.treeWidget = QtGui.QTreeWidget(self)
        self.properties = QtGui.QTreeWidgetItem(self.treeWidget, ["Properties"])
        self.step = QtGui.QTreeWidgetItem(self.properties, ["Iteration #"])

        # Use QThread is enough
        self.thread = QtCore.QThread();
        # Change the thread affinity of worker to self.thread.
        self.this_worker.moveToThread(self.thread);
        self.this_worker.update_signal.connect(self.update_GUI)
        self.this_worker.done_signal.connect(self.thread.quit)
        self.start_comp.connect(self.this_worker.start_computation)
        self.thread.start()

    ###############################
    # Here is what should update the GUI at every iteration of Worker.some_complex_processing()
    # The message appears to be getting sent, due to seeing the print statement in the console, but the GUI is not updated.
    @QtCore.pyqtSlot(int)
    def update_GUI(self, iteration):
        self.step.setText(0, str(iteration))
        #time.sleep(0.1)
        print iteration

    def start_signal_emit(self):
        self.start_comp.emit()
于 2013-04-06T02:56:38.903 回答
2

好的,我对 Qt 本身并不熟悉,但我用 Tkinter 做过类似的工作。我很确定您在这里遇到了 Python 的 Global Interpreter Lock。具体来说,您在同一个线程中启动队列和 GUI 应用程序,因此当队列阻塞等待输入时,GUI 也会阻塞。尝试app = QtGui.QApplication([])在它自己的线程或进程中启动。用队列编写 GUI 总是有点复杂,而且我发现它通常需要比我开始预期的多至少 1 层线程。

于 2013-04-05T18:07:09.760 回答