4

我正在尝试同时使用多处理和 matplotlib。

我正在创建一个标准Pool,使用 的回调函数添加工作apply_async并更新 GUI apply_async,该函数在 Pool 的父进程上运行(我用 验证了这一点os.getpid())。例子 :

from pylab import *
from numpy import *
from numpy.random import random
from multiprocessing import Pool

# Output image
global out_all
out_all = zeros((256, 256))

# Only does something to in_image, doesn't access anything else
def do_work(in_image):
    for x in xrange(100000):
        out_image = in_image[::-1, ::-1]
    return out_image

# Update the output image and display if needed
def do_update(out_image):
    global out_all
    print ("Updating")
    out_all += out_image
    clf()
    imshow(out_all)
    show()

# Input images (close enough to what I do as well)
work = [random((256, 256)) for f in range(20)]

# Don't block when showing something
ion()

# Do the work
print "Starting pool"
pool = Pool()
for o in work:
    pool.apply_async(do_work, [o], callback=do_update).get()
pool.close()
pool.join()
print "Stopping pool"

# Block
ioff()
show()
print "Done"

处理本身工作正常,进程真的被破坏了pool.join(),但是 Matplotlib(和 TK,我猜)一旦我尝试做某事,即使只退出程序,也会抱怨:

Traceback (most recent call last):
  File "test_thread.py", line 27, in <module>
    show()
  File "/usr/lib/pymodules/python2.7/matplotlib/pyplot.py", line 139, in show
    _show(*args, **kw)
  File "/usr/lib/pymodules/python2.7/matplotlib/backend_bases.py", line 83, in __call__
    manager.show()
  File "/usr/lib/pymodules/python2.7/matplotlib/backends/backend_tkagg.py", line 444, in show
    self.canvas.draw_idle()
  File "/usr/lib/pymodules/python2.7/matplotlib/backends/backend_tkagg.py", line 258, in draw_idle
    self._idle_callback = self._tkcanvas.after_idle(idle_draw)
  File "/usr/lib/python2.7/lib-tk/Tkinter.py", line 512, in after_idle
    return self.after('idle', func, *args)
  File "/usr/lib/python2.7/lib-tk/Tkinter.py", line 504, in after
    name = self._register(callit)
  File "/usr/lib/python2.7/lib-tk/Tkinter.py", line 1101, in _register
    self.tk.createcommand(name, f)
RuntimeError: main thread is not in main loop
Error in atexit._run_exitfuncs:
Traceback (most recent call last):
  File "/usr/lib/python2.7/atexit.py", line 24, in _run_exitfuncs
    func(*targs, **kargs)
  File "/usr/lib/pymodules/python2.7/matplotlib/_pylab_helpers.py", line 82, in destroy_all
    manager.destroy()
  File "/usr/lib/pymodules/python2.7/matplotlib/backends/backend_tkagg.py", line 452, in destroy
    self.canvas._tkcanvas.after_cancel(self.canvas._idle_callback)
  File "/usr/lib/python2.7/lib-tk/Tkinter.py", line 519, in after_cancel
    data = self.tk.call('after', 'info', id)
RuntimeError: main thread is not in main loop
Error in sys.exitfunc:
Traceback (most recent call last):
  File "/usr/lib/python2.7/atexit.py", line 24, in _run_exitfuncs
    func(*targs, **kargs)
  File "/usr/lib/pymodules/python2.7/matplotlib/_pylab_helpers.py", line 82, in destroy_all
    manager.destroy()
  File "/usr/lib/pymodules/python2.7/matplotlib/backends/backend_tkagg.py", line 452, in destroy
    self.canvas._tkcanvas.after_cancel(self.canvas._idle_callback)
  File "/usr/lib/python2.7/lib-tk/Tkinter.py", line 519, in after_cancel
    data = self.tk.call('after', 'info', id)
RuntimeError: main thread is not in main loop

我的第一个想法是每个 TK 上下文都重复fork(),这以某种方式干扰了主流程中的 TK 循环,但我没有在我的工作人员中做任何与 TK 相关的事情。有任何想法吗?

4

1 回答 1

6

错误消息参考Tkinter。所以看起来你正在使用 TkAgg 后端。下面的代码是特定于 TkAgg/Tkinter 的。特别是调用

win.after(100, animate)

使用了 Tkinter 特有的after方法。GtkAgg/PyGtk 有类似的调用,其他后端也有类似的调用。但我只想强调,接下来的内容是特定于 TkAgg/Tkinter 的。


Tkinter 旨在在单个线程中运行。也就是说,所有 Tkinter GUI 调用都应该源自单个线程(通常,不一定是主线程)。

Pool 的回调方法在主进程apply_async的单独 ( ) 线程中运行。_handle_results由于imshow()是从池的_handle_results线程show()中调用并在主线程中调用的,因此Tkinter抱怨

RuntimeError: main thread is not in main loop

在这种情况下,我看不到使用apply_async回调的方法。

相反,我们可以做的是安排do_work放入out_image一个multiprocessing.Queue()(我out_queue在下面的代码中调用它)。然后,我们将让主进程的主线程轮询此队列中的项目,并在它们从队列中出来时显示它们。此轮询在animate下面的函数中完成。


plt.ion()仅用于交互式会话。尽管有时可以编写一些似乎可以使用的小脚本,但如果您拒绝在脚本中使用,而是编写尊重 GUI 框架事件循环的代码plt.ion(),您将获得更好的结果和更清晰的 GUI 。plt.ion()

虽然可能可以修复您的脚本并使用plt.ion(),因为这不是编写 matplotlib 脚本的推荐方式,让我们看看我们是否可以避免这样做。


plt.show()告诉 Tkinter 运行它的事件循环。请注意,一旦进行此调用,就会绘制 GUI 窗口,您可以单击按钮、放大和缩小等。

不知何故,我们需要在这个事件循环中注入一个函数,由事件循环定期运行,并与可能发生的所有其他 GUI 事件协作。我们希望这个函数检查我们的任何工作子进程是否有输出给我们,如果有,更新 imshow 图像。

使用 TkAgg/Tkinter,注入这样一个函数的方法是

win = fig.canvas.manager.window
win.after(100, animate)

这将告诉 Tkinteranimate在(大约)100 毫秒后运行该函数(一次)。由于我们希望该函数animate定期运行,我们只需粘贴另一个

win.after(100, animate)

结束时调用animate


import matplotlib as mpl
mpl.use('TkAgg')
import matplotlib.pyplot as plt
import numpy as np
import multiprocessing as mp
import logging
import Queue
logger = mp.log_to_stderr(logging.INFO)

# Only does something to in_image, doesn't access anything else
def do_work(in_image):
    logger.info('Processing in_image')
    for x in xrange(100000):
        out_image = in_image[::-1, ::-1]
    out_queue.put(out_image)

# Update the output image and display if needed
out_all = np.zeros((256, 256))


def pool_initializer(out_queue_):
    # Setup out_queue as a global variable *in the worker subprocesses*
    global out_queue
    out_queue = out_queue_


def animate():
    global out_all
    try:
        out_image = out_queue.get_nowait()
    except Queue.Empty:
        pass
    else:
        logger.info("Updating")
        out_all += out_image
        im.set_data(out_all)
        fig.canvas.draw()  # redraw the canvas
    win.after(100, animate)

if __name__ == '__main__':
    out_queue = mp.Queue()
    logger.info("Starting pool")
    pool = mp.Pool(initializer=pool_initializer, initargs=(out_queue, ))
    work = [np.random.random((256, 256)) for f in range(20)]
    for o in work:
        pool.apply_async(do_work, [o])
    pool.close()

    fig, ax = plt.subplots()
    win = fig.canvas.manager.window
    # Output image
    im = plt.imshow(out_all, vmin=0, vmax=1)

    # Register a function to be run once
    win.after(100, animate)
    plt.show()
    logger.info("Done")
于 2013-04-15T13:29:14.977 回答