2

谢谢你看看这个。我承认我已经在 python 中涉足了 1 周的并行处理,所以如果我错过了一个明显的解决方案,我深表歉意。我有一段代码,我想运行几个不同的 mp.pool() 实例。主 .py 文件中的那些工作正常,但是当我尝试将它们添加到模块中的函数时,我没有从它们中得到任何输出。该应用程序只是运行过去并继续。我认为这可能与这篇文章有关,但它没有就替代方法提供任何想法来完成我所需要的。在一个简单示例中工作的代码是这样的:

import multiprocessing as mp
def multiproc_log_result(retval):
    results.append(retval)
    if len(results) % (10 // 10) == 0:
        print('{0}% done'.format(100 * len(results) / 10))

def meat():
    print 'beef'
    status = True
    return status
results = []
pool = mp.Pool(thread_count)
for x in range(10):
    pool.apply_async(meat, callback=multiproc_log_result)
pool.close()
pool.join()


def veggie():
    print 'carrot'
    status = True
    return status

results = []
pool = mp.Pool(thread_count)
for x in range(10):
    pool.apply_async(veggie, callback=multiproc_log_result)
pool.close()
pool.join()

不起作用的代码是:

import multiprocessing as mp
def multiproc_log_result(retval):
    results.append(retval)
    if len(results) % (10 // 10) == 0:
        print('{0}% done'.format(100 * len(results) / 10))

def meat():
    print 'beef'
    status = True
    return status
results = []
pool = mp.Pool(thread_count)
for x in range(10):
    pool.apply_async(meat, callback=multiproc_log_result)
pool.close()
pool.join()

def nested_stupid_fn():
    def multiproc_log_result(retval):
        results.append(retval)
        if len(results) % (10 // 10) == 0:
            print('{0}% done'.format(100 * len(results) / 10))

    def veggie():
        print 'carrot'
        status = True
        return status

    results = []
    pool = mp.Pool(thread_count)
    for x in range(10):
        pool.apply_async(veggie, callback=multiproc_log_result)
    pool.close()
    pool.join()
nested_stupid_fn()

最终,我希望通过将它存在于单独模块中的另一个函数中来删除一个不起作用的示例。因此,当我导入模块 packngo 并将其用作 packngo.basic_packngo(inputs) 并在其中某处具有嵌套函数的内容时,它们将运行。任何帮助将不胜感激。:DI 是一个非常简单的人,所以如果你能像对孩子一样解释,也许它会在我脑海中浮现!

4

2 回答 2

3

您链接的另一个问题有解决方案,只是没有说明:您不能使用嵌套函数作为/系列方法的func参数。它们适用于,因为由可以直接传递函数引用的线程支持,但必须对函数进行腌制,并且只有具有可导入名称的函数才能被腌制。如果您检查嵌套函数的名称,它类似于,并且该组件使其无法导入(这通常是一件好事;利用嵌套的嵌套函数通常在闭包范围内具有临界状态,仅导入就会失去)。apply**map*multiprocessing.Poolmultiprocessing.dummy.Poolmultiprocessing.dummymultiprocessing.Poolmodulename.outerfuncname.<locals>.innerfuncname<locals>

以嵌套方式定义函数非常好callback,因为它们是在父进程中执行的,它们不会发送给工作人员。在您的情况下,只有回调依赖于闭包范围,因此将func( veggie) 移出全局范围是非常好的,将您的packngo模块定义为:

def veggie():
    print 'carrot'
    status = True
    return status

def nested_stupid_fn():
    def multiproc_log_result(retval):
        results.append(retval)
        if len(results) % (10 // 10) == 0:
            print('{0}% done'.format(100 * len(results) / 10))

    results = []
    pool = mp.Pool(thread_count)
    for x in range(10):
        pool.apply_async(veggie, callback=multiproc_log_result)
    pool.close()
    pool.join()
nested_stupid_fn()

是的,这意味着veggie成为相关模块的公共成员。如果你想表明它应该被视为一个实现细节,你可以在它前面加上一个下划线 ( _veggie),但它必须是全局的才能与它一起使用multiprocessing.Pool

于 2019-06-28T16:36:00.547 回答
0

好吧,我认为问题在于multiproc_log_result变量的范围内results不存在。因此,您应该做的是将异步调用的结果直接附加到结果中。不过,您将无法跟踪进度(我猜无法直接为类之外的回调函数共享全局变量)

from multiprocessing.pool import ThreadPool

def nested_stupid_fn():
    def multiproc_log_result(retval):
        results.append(retval)

    def veggie():
        print 'carrot'
        status = True
        return status

    results = []
    pool = ThreadPool(thread_count)
    for x in range(10):
        results.append(pool.apply_async(veggie))

    pool.close()
    pool.join()

    results = [result.get() for result in results]  # get value from async result

    ...then do stuff with results
于 2018-03-01T09:51:58.613 回答