0

我已经编写了一个 Python 脚本,用于使用 GDAL 开源库和该库提供的命令行实用程序来平铺图像。首先,我读取了一个输入数据集,它告诉我每个图块范围。然后,我遍历图块并启动一个子进程来调用 gdalwarp,以便将输入图像剪辑到循环中的当前图块。

我不使用 Popen.wait() ,因为这会阻止瓷砖被同时处理,但我确实想跟踪子进程返回的任何消息。此外,一旦创建了特定的图块,我需要使用 gdalinfo 计算新文件的统计信息,这需要另一个子进程。

这是代码:

processing = {}
for tile in tileNums:
    subp = subprocess.Popen(['gdalwarp', '-ot', 'Int16', '-r', 'cubic', '-of', 'HFA', '-cutline', tileIndexShp, '-cl', os.path.splitext(os.path.basename(tileIndexShp))[0], '-cwhere', "%s = '%s'" % (tileNumField, tile), '-crop_to_cutline', os.path.join(inputTileDir, 'mosaic_Proj.vrt'), os.path.join(outputTileDir, "Tile_%s.img" % regex.sub('_', tile))], stdout=subprocess.PIPE)
    processing[tile] = [subp]

while processing:
    for tile, subps in processing.items():
        for idx, subp in enumerate(subps):
            if subp == None: continue
            poll = subp.poll()
            if poll == None: continue
            elif poll != 0:
                subps[idx] = None
                print tile, "%s Unsuccessful" % ("Retile" if idx == 0 else "Statistics")
            else:
                subps[idx] = None
                print tile, "%s Succeeded" % ("Retile" if idx == 0 else "Statistics")
                if subps == [None, None]:
                    del processing[tile]
                    continue
                subps.append(subprocess.Popen(['gdalinfo', '-stats', os.path.join(outputTileDir, "Tile_%s.img" % regex.sub('_',tile))], stdout=subprocess.PIPE))

在大多数情况下,这对我有用,但我看到的一个问题是,当它到达最后一个图块时,它似乎创建了一个无限循环。我知道这不是最好的方法,但我对子流程模块很陌生,我基本上只是把它放在一起尝试让它工作。

谁能推荐一种更好的方法来遍历图块列表,为每个可以同时处理的图块生成一个子进程,并在每个图块的第一个子进程完成时生成第二个子进程?

更新: 感谢到目前为止的所有建议。我尝试重构上面的代码以利用多处理模块和池。

这是新代码:

def ProcessTile(tile):

    tileName = os.path.join(outputTileDir, "Tile_%s.img" % regex.sub('_', tile))

    warp = subprocess.Popen(['gdalwarp', '-ot', 'Int16', '-r', 'cubic', '-of', 'HFA', '-cutline', tileIndexShp, '-cl', os.path.splitext(os.path.basename(tileIndexShp))[0], '-cwhere', "%s = '%s'" % (tileNumField, tile), '-crop_to_cutline', os.path.join(inputTileDir, 'mosaic_Proj.vrt'), tileName], stdout=subprocess.PIPE)
    warpMsg = tile, "Retile %s" % "Successful" if warp.wait() == 0 else "Unsuccessful"

    info = subprocess.Popen(['gdalinfo', '-stats', tileName], stdout=subprocess.PIPE)
    statsMsg = tile, "Statistics %s" % "Successful" if info.wait() == 0 else "Unsuccessful"

    return warpMsg, statsMsg

print "Retiling..."
pool = multiprocessing.Pool()
for warpMsg, statsMsg in pool.imap_unordered(ProcessTile, tileNums): print "%s\n%s" % (warpMsg, statsMsg)

这给我带来了一些重大问题。首先,我最终创建了许多新流程。大约一半是 python.exe,另一半是另一个 gdal 实用程序,如果它已经在另一个平铺方案 (gdalbuildvrt.exe) 中平铺,我在上面的代码之前调用它来镶嵌传入的图像。在正在创建的所有 python.exe 和 gdalbuildvrt.exe 进程之间,大约 25% 的 CPU(超线程时具有 8 个内核的 Intel I7)和 99% 的 16gb RAM 正在使用中,计算机完全挂起。我什至无法杀死任务管理器中的进程或通过带有 taskkill 的命令行。

我在这里想念什么?

4

2 回答 2

2

使用 python多处理模块创建进程,而不是生成和管理单个子进程。

于 2012-12-17T22:00:00.840 回答
0

我还没有测试过,但它应该可以工作:

import Queue

from threading import Thread

class Consumer(Thread):
    def __init__(self, queue=None):
        super(Consumer, self).__init__()

        self.daemon = True
        self.queue = queue


    def run(self):
        while True:
            task = self.queue.get()

            # Spawn your process and .wait() for it to finish.

            self.queue.task_done()

if __name__ == '__main__':
     queue = Queue.Queue()

     for task in get_tasks():
         queue.put(task)

     # You spawn 20 worker threads to process your queue nonstop
     for i in range(20):
         consumer = Consumer(queue)
         consumer.start()

     queue.join()

基本上,你有一个队列,里面装满了你需要完成的任务。然后,您只需生成 20 个工作线程来不断地从队列中拉出新任务并同时处理它们。

于 2012-12-17T22:03:26.373 回答