6

假设我有一个函数processing。我想为多个参数并行运行相同的函数多次,而不是一个接一个地依次运行。

def processing(image_location):
    
    image = rasterio.open(image_location)
    ...
    ...
    return(result)

#calling function serially one after the other with different parameters and saving the results to a variable.
results1 = processing(r'/home/test/image_1.tif')
results2 = processing(r'/home/test/image_2.tif')
results3 = processing(r'/home/test/image_3.tif')

例如,如果我运行delineation(r'/home/test/image_1.tif')thendelineation(r'/home/test/image_2.tif')和 then delineation(r'/home/test/image_3.tif'),如上面的代码所示,它将一个接一个地依次运行,如果一个函数运行需要 5 分钟,那么运行这三个函数将需要 5x3=15 分钟。因此,我想知道我是否可以并行/尴尬地并行运行这三个,以便只需 5 分钟即可为所有三个不同参数执行该函数。

帮助我以最快的方式完成这项工作。该脚本应该能够利用默认情况下可用的所有资源/CPU/ram 来执行此任务。

4

4 回答 4

3

You can use multiprocessing to execute functions in parallel and save results to results variable:

from multiprocessing.pool import ThreadPool

pool = ThreadPool()
images = [r'/home/test/image_1.tif', r'/home/test/image_2.tif', r'/home/test/image_3.tif']
results = pool.map(delineation, images)
于 2020-09-11T06:27:49.133 回答
2

您可能想看看IPython Parallel。它允许您轻松地在负载平衡(本地)集群上运行函数。

对于这个小例子,请确保您安装了IPython ParallelNumPyPillow。要运行该示例,您首先需要启动集群。要启动具有四个并行引擎的本地集群,请在终端中键入(一个引擎对应一个处理器内核似乎是一个合理的选择):

ipcluster 4

然后您可以运行以下脚本,该脚本在给定目录中搜​​索 jpg-images 并计算每个图像中的像素数:

import ipyparallel as ipp


rc = ipp.Client()
with rc[:].sync_imports():  # import on all engines
    import numpy
    from pathlib import Path
    from PIL import Image


lview = rc.load_balanced_view()  # default load-balanced view
lview.block = True  # block until map() is finished


@lview.parallel()
def count_pixels(fn: Path):
    """Silly function to count the number of pixels in an image file"""
    im = Image.open(fn)
    xx = numpy.asarray(im)
    num_pixels = xx.shape[0] * xx.shape[1]
    return fn.stem, num_pixels


pic_dir = Path('Pictures')
fn_lst = pic_dir.glob('*.jpg')  # list all jpg-files in pic_dir

results = count_pixels.map(fn_lst)  # execute in parallel

for n_, cnt in results:
    print(f"'{n_}' has {cnt} pixels.")
于 2020-09-17T22:25:47.913 回答
0

我认为最简单的方法之一是使用joblib

import joblib

allJobs = []
allJobs.append(joblib.delayed(processing)(r'/home/test/image_1.tif'))
allJobs.append(joblib.delayed(processing)(r'/home/test/image_2.tif'))
allJobs.append(joblib.delayed(processing)(r'/home/test/image_3.tif'))

results = joblib.Parallel(n_jobs=joblib.cpu_count(), verbose=10)(allJobs)

于 2020-09-21T13:52:23.333 回答
0

使用库编写的另一种方式multiprocessing(请参阅@Alderven 了解不同的功能)。

import multiprocessing as mp

def calculate(input_args):
    result = input_args * 2
    return result

N = mp.cpu_count()
parallel_input = np.arange(0, 100)
print('Amount of CPUs ', N)
print('Amount of iterations ', len(parallel_input))

with mp.Pool(processes=N) as p:
    results = p.map(calculate, list(parallel_input))

results变量将包含一个包含已处理数据的列表。然后你就可以写了。

于 2020-09-21T08:52:10.860 回答