2

我正在编写一个动画图像数据的脚本。我有许多大型图像立方体(3D 阵列)。对于其中的每一个,我会逐步浏览每个立方体中的框架,一旦接近它的尽头,我就会加载下一个立方体并继续。由于每个立方体的尺寸很大,因此加载时间很长(~5s)。我希望动画在多维数据集之间无缝过渡(同时也节省内存),所以我错开加载过程。我在解决方案方面取得了一些进展,但一些问题仍然存在。

下面的代码加载每个数据立方体,将其拆分为帧并将它们放入multiprocessing.Queue. 一旦队列中的帧数低于某个阈值,就会触发下一个加载过程,该过程会加载另一个多维数据集并将其解包到队列中。

查看下面的代码:

import numpy as np
import multiprocessing as mp
import logging
logger = mp.log_to_stderr(logging.INFO)
import time

def data_loader(event, queue, **kw):
    '''loads data from 3D image cube'''
    event.wait()        #wait for trigger before loading

    logger.info( 'Loading data' )
    time.sleep(3)                       #pretend to take long to load the data
    n = 100
    data = np.ones((n,20,20))*np.arange(n)[:,None,None]          #imaginary 3D image cube (increasing numbers so that we can track the data ordering)

    logger.info( 'Adding data to queue' )
    for d in data:
        queue.put(d)
    logger.info( 'Done adding to queue!' )


def queue_monitor(queue, triggers, threshold=50, interval=5):
    '''
    Triggers the load events once the number of data in the queue falls below 
    threshold, then doesn't trigger again until the interval has passed.  
    Note: interval should be larger than data load time.
    '''
    while len(triggers):
        if queue.qsize() < threshold:
            logger.info( 'Triggering next load' )
            triggers.pop(0).set()
            time.sleep(interval)    


if __name__ == '__main__':
    logger.info( "Starting" )
    out_queue = mp.Queue()

    #Initialise the load processes
    nprocs, procs = 3, []
    triggers = [mp.Event() for _ in range(nprocs)]
    triggers[0].set()           #set the first process to trigger immediately
    for i, trigger in enumerate(triggers):
        p = mp.Process( name='data_loader %d'%i, target=data_loader, 
                        args=(trigger, out_queue) )
        procs.append( p )
    for p in procs:
        p.start()

    #Monitoring process
    qm = mp.Process( name='queue_monitor', target=queue_monitor, 
                     args=(out_queue, triggers) )
    qm.start()

    #consume data
    while out_queue.empty():
        pass
    else:
        for d in iter( out_queue.get, None ):
            time.sleep(0.2)   #pretend to take some time to process/animate the data
            logger.info( 'data: %i' %d[0,0] )   #just to keep track of data ordering

这在某些情况下非常有效,但有时在触发新的加载过程后数据的顺序会变得混乱。我不知道为什么会发生这种情况 - mp.Queue 应该是 FIFO 对吧?!例如。运行上面的代码不会保留输出队列中的正确顺序,但是,将阈值更改为较低的值,例如。30 解决了这个问题。*如此迷茫...

所以问题:如何multiprocessing在python中正确实现这种交错加载策略?

4

1 回答 1

3

这看起来像一个缓冲问题。在内部,multiprocessing.Queue使用缓冲区临时存储您已排队的项目,并最终将它们刷新到Pipe后台线程中。只有在刷新发生之后,项目才会真正发送到其他进程。因为您将大型对象放在 上Queue,所以会进行大量缓冲。这导致加载过程实际上重叠,即使您的日志记录显示一个过程在另一个过程开始之前完成。文档实际上对这种情况有警告:

当一个对象被放入队列时,该对象被腌制并且后台线程稍后将腌制数据刷新到底层管道。这会产生一些令人惊讶的后果,但不会造成任何实际困难——如果它们真的困扰您,那么您可以改用由经理创建的队列。

  1. 将对象放入空队列后,队列的empty()方法返回 False 并且get_nowait()可以在没有 raise的情况下返回之前可能会有一个无限小的延迟Queue.Empty
  2. 如果多个进程正在对对象进行排队,则对象可能会在另一端乱序接收。但是,由同一进程排队的对象将始终处于相对于彼此的预期顺序。

我建议按照文档状态进行操作,并使用 amultiprocessing.Manager创建您的队列:

m = mp.Manager()
out_queue = m.Queue()

这将使您完全避免该问题。

另一种选择是只使用一个进程来完成所有数据加载,并让它在循环中运行,event.wait()调用位于循环的顶部。

于 2014-10-16T16:51:29.637 回答