76

我正在寻找一个简单的基于进程的python的并行映射,即一个函数

parmap(function,[data])

这将在不同进程上的 [data] 的每个元素上运行函数(嗯,在不同的核心上,但是 AFAIK,在 python 的不同核心上运行东西的唯一方法是启动多个解释器),并返回结果列表.

这样的事情存在吗?我想要一些简单的东西,所以一个简单的模块会很好。当然,如果不存在这样的东西,我会满足于一个大图书馆:-/

4

5 回答 5

138

我似乎你需要的是multiprocessing.Pool() 中的 map 方法

映射(函数,可迭代 [,块大小])

A parallel equivalent of the map() built-in function (it supports only
one iterable argument though). It blocks till the result is ready.

This method chops the iterable into a number of chunks which it submits to the 
process pool as separate tasks. The (approximate) size of these chunks can be 
specified by setting chunksize to a positive integ

例如,如果你想映射这个函数:

def f(x):
    return x**2

到 range(10),你可以使用内置的 map() 函数:

map(f, range(10))

或使用 multiprocessing.Pool() 对象的方法 map():

import multiprocessing
pool = multiprocessing.Pool()
print pool.map(f, range(10))
于 2009-11-09T22:49:55.793 回答
9

这可以通过 Ray 优雅地完成,Ray是一个允许您轻松并行化和分发 Python 代码的系统。

要并行化您的示例,您需要使用@ray.remote装饰器定义 map 函数,然后使用.remote. 这将确保远程函数的每个实例都将在不同的进程中执行。

import time
import ray

ray.init()

# Define the function you want to apply map on, as remote function. 
@ray.remote
def f(x):
    # Do some work...
    time.sleep(1)
    return x*x

# Define a helper parmap(f, list) function.
# This function executes a copy of f() on each element in "list".
# Each copy of f() runs in a different process.
# Note f.remote(x) returns a future of its result (i.e., 
# an identifier of the result) rather than the result itself.  
def parmap(f, list):
    return [f.remote(x) for x in list]

# Call parmap() on a list consisting of first 5 integers.
result_ids = parmap(f, range(1, 6))

# Get the results
results = ray.get(result_ids)
print(results)

这将打印:

[1, 4, 9, 16, 25]

它将以大约len(list)/p(四舍五入最接近的整数)完成,其中p是您机器上的内核数。假设一台机器有 2 个内核,我们的示例将执行5/2四舍五入,即大约3sec。

与多处理模块相比,使用 Ray 有许多优点。特别是,相同的代码将在单台机器和机器集群上运行。有关 Ray 的更多优点,请参阅此相关帖子

于 2019-02-06T23:14:47.007 回答
7

对于那些寻找与 R 的 mclapply() 等效的 Python 的人,这里是我的实现。它是对以下两个示例的改进:

它可以应用于具有单个或多个参数的映射函数。

import numpy as np, pandas as pd
from scipy import sparse
import functools, multiprocessing
from multiprocessing import Pool

num_cores = multiprocessing.cpu_count()

def parallelize_dataframe(df, func, U=None, V=None):

    #blockSize = 5000
    num_partitions = 5 # int( np.ceil(df.shape[0]*(1.0/blockSize)) )
    blocks = np.array_split(df, num_partitions)

    pool = Pool(num_cores)
    if V is not None and U is not None:
        # apply func with multiple arguments to dataframe (i.e. involves multiple columns)
        df = pd.concat(pool.map(functools.partial(func, U=U, V=V), blocks))
    else:
        # apply func with one argument to dataframe (i.e. involves single column)
        df = pd.concat(pool.map(func, blocks))

    pool.close()
    pool.join()

    return df

def square(x):
    return x**2

def test_func(data):
    print("Process working on: ", data.shape)
    data["squareV"] = data["testV"].apply(square)
    return data

def vecProd(row, U, V):
    return np.sum( np.multiply(U[int(row["obsI"]),:], V[int(row["obsJ"]),:]) )

def mProd_func(data, U, V):
    data["predV"] = data.apply( lambda row: vecProd(row, U, V), axis=1 )
    return data

def generate_simulated_data():

    N, D, nnz, K = [302, 184, 5000, 5]
    I = np.random.choice(N, size=nnz, replace=True)
    J = np.random.choice(D, size=nnz, replace=True)
    vals = np.random.sample(nnz)

    sparseY = sparse.csc_matrix((vals, (I, J)), shape=[N, D])

    # Generate parameters U and V which could be used to reconstruct the matrix Y
    U = np.random.sample(N*K).reshape([N,K])
    V = np.random.sample(D*K).reshape([D,K])

    return sparseY, U, V

def main():
    Y, U, V = generate_simulated_data()

    # find row, column indices and obvseved values for sparse matrix Y
    (testI, testJ, testV) = sparse.find(Y)

    colNames = ["obsI", "obsJ", "testV", "predV", "squareV"]
    dtypes = {"obsI":int, "obsJ":int, "testV":float, "predV":float, "squareV": float}

    obsValDF = pd.DataFrame(np.zeros((len(testV), len(colNames))), columns=colNames)
    obsValDF["obsI"] = testI
    obsValDF["obsJ"] = testJ
    obsValDF["testV"] = testV
    obsValDF = obsValDF.astype(dtype=dtypes)

    print("Y.shape: {!s}, #obsVals: {}, obsValDF.shape: {!s}".format(Y.shape, len(testV), obsValDF.shape))

    # calculate the square of testVals    
    obsValDF = parallelize_dataframe(obsValDF, test_func)

    # reconstruct prediction of testVals using parameters U and V
    obsValDF = parallelize_dataframe(obsValDF, mProd_func, U, V)

    print("obsValDF.shape after reconstruction: {!s}".format(obsValDF.shape))
    print("First 5 elements of obsValDF:\n", obsValDF.iloc[:5,:])

if __name__ == '__main__':
    main()
于 2019-01-21T20:48:53.587 回答
5

Python3 的 Pool 类有一个 map() 方法,这就是并行化 map 所需的全部内容:

from multiprocessing import Pool

with Pool() as P:
    xtransList = P.map(some_func, a_list)

Usingwith Pool() as P类似于进程池,将并行执行列表中的每个项目。您可以提供核心数量:

with Pool(processes=4) as P:
于 2020-12-20T02:28:46.270 回答
3

我知道这是一篇旧文章,但以防万一,我写了一个工具来制作这个超级、超级简单的工具,称为parmapper(我实际上在使用中称它为 parmap,但名称已被使用)。

它处理大量流程的设置和解构,并添加了大量功能。按重要性的粗略顺序

  • 可以采用 lambda 和其他不可腌制的功能
  • 可以应用starmap等类似的调用方式,直接使用非常方便。
  • 可以在线程和/或进程之间拆分
  • 包括进度条等功能

它确实会产生很小的成本,但对于大多数用途来说,这可以忽略不计。

希望对你有帮助。

(注意:它与mapPython 3+ 一样,返回一个可迭代对象,因此如果您希望所有结果立即通过它,请使用list()

于 2019-05-10T20:11:13.360 回答