我正在编写一个动画图像数据的脚本。我有许多大型图像立方体(3D 阵列)。对于其中的每一个,我会逐步浏览每个立方体中的框架,一旦接近它的尽头,我就会加载下一个立方体并继续。由于每个立方体的尺寸很大,因此加载时间很长(~5s)。我希望动画在多维数据集之间无缝过渡(同时也节省内存),所以我错开加载过程。我在解决方案方面取得了一些进展,但一些问题仍然存在。
下面的代码加载每个数据立方体,将其拆分为帧并将它们放入multiprocessing.Queue
. 一旦队列中的帧数低于某个阈值,就会触发下一个加载过程,该过程会加载另一个多维数据集并将其解包到队列中。
查看下面的代码:
import numpy as np
import multiprocessing as mp
import logging
logger = mp.log_to_stderr(logging.INFO)
import time
def data_loader(event, queue, **kw):
'''loads data from 3D image cube'''
event.wait() #wait for trigger before loading
logger.info( 'Loading data' )
time.sleep(3) #pretend to take long to load the data
n = 100
data = np.ones((n,20,20))*np.arange(n)[:,None,None] #imaginary 3D image cube (increasing numbers so that we can track the data ordering)
logger.info( 'Adding data to queue' )
for d in data:
queue.put(d)
logger.info( 'Done adding to queue!' )
def queue_monitor(queue, triggers, threshold=50, interval=5):
'''
Triggers the load events once the number of data in the queue falls below
threshold, then doesn't trigger again until the interval has passed.
Note: interval should be larger than data load time.
'''
while len(triggers):
if queue.qsize() < threshold:
logger.info( 'Triggering next load' )
triggers.pop(0).set()
time.sleep(interval)
if __name__ == '__main__':
logger.info( "Starting" )
out_queue = mp.Queue()
#Initialise the load processes
nprocs, procs = 3, []
triggers = [mp.Event() for _ in range(nprocs)]
triggers[0].set() #set the first process to trigger immediately
for i, trigger in enumerate(triggers):
p = mp.Process( name='data_loader %d'%i, target=data_loader,
args=(trigger, out_queue) )
procs.append( p )
for p in procs:
p.start()
#Monitoring process
qm = mp.Process( name='queue_monitor', target=queue_monitor,
args=(out_queue, triggers) )
qm.start()
#consume data
while out_queue.empty():
pass
else:
for d in iter( out_queue.get, None ):
time.sleep(0.2) #pretend to take some time to process/animate the data
logger.info( 'data: %i' %d[0,0] ) #just to keep track of data ordering
这在某些情况下非常有效,但有时在触发新的加载过程后数据的顺序会变得混乱。我不知道为什么会发生这种情况 - mp.Queue 应该是 FIFO 对吧?!例如。运行上面的代码不会保留输出队列中的正确顺序,但是,将阈值更改为较低的值,例如。30 解决了这个问题。*如此迷茫...
所以问题:如何multiprocessing
在python中正确实现这种交错加载策略?