4

我编写了一个 python 函数,它使用提供的目录模式搜索文件系统,并在每个级别提供可选的“操作”。然后我尝试对它进行多线程处理,因为一些卷位于网络共享上,我想尽量减少 IO 阻塞。我从使用多处理 Pool 类开始,因为这是最方便的......(说真的,没有用于线程的 Pool 类?)我的函数尽可能地解开提供的 FS 模式并将新返回的路径提交给池,直到没有新路径被退回。直接使用函数和类时,我让它工作得很好,但现在我试图从另一个类中使用这个函数,我的程序似乎挂起。为了简化,我使用线程而不是进程重写了函数,甚至写了一个简单的线程池类......同样的问题。这里'

file test1.py:
------------------------------------------------

import os
import glob
from multiprocessing import Pool

def mapGlob(pool,paths,pattern):
    results = []
    paths = [os.path.join(p,pattern) for p in paths]
    for result in pool.map(glob.glob,paths):
        results += result
    return results

def findAllMyPaths():
    pool = Pool(10)
    paths = ['/Volumes']
    follow = ['**','ptid_*','expid_*','slkid_*']
    for pattern in follow:
        paths = mapGlob(pool,paths,pattern)
    return paths


file test2.py:
----------------------------------------------------------------------------

from test1 import findAllMyPaths

allmypaths = findAllMyPaths()

现在如果我打电话

>>>from test1 import findAllMyPaths
>>>findAllMyPaths()
>>>...long list of all the paths

这工作正常,但如果尝试:

>>>from test2 import allmypaths

python永远挂起。调用了动作函数(在本例中为 glob),但它们似乎永远不会返回......我需要帮助......并行化版本在正常工作时运行得更快(取决于什么“动作”,速度快 6-20 倍)被映射到 FS 树中的每个点),所以我希望能够使用它。

如果我将映射函数更改为非并行版本:

def mapGlob(pool,paths,pattern):
    results = []
    paths = [os.path.join(p,pattern) for p in paths]
    for path in paths:
        results += glob.glob(path)
    return results

一切运行良好。

编辑:

我打开了多处理中的调试,看看这是否可以进一步帮助我。在它有效的情况下,我得到:

[DEBUG/MainProcess] created semlock with handle 5
[DEBUG/MainProcess] created semlock with handle 6
[DEBUG/MainProcess] created semlock with handle 9
[DEBUG/MainProcess] created semlock with handle 10
[INFO/PoolWorker-1] child process calling self.run()
[INFO/PoolWorker-2] child process calling self.run()
[INFO/PoolWorker-3] child process calling self.run()
[INFO/PoolWorker-5] child process calling self.run()
[INFO/PoolWorker-4] child process calling self.run()
[INFO/PoolWorker-6] child process calling self.run()
[INFO/PoolWorker-7] child process calling self.run()
[INFO/PoolWorker-9] child process calling self.run()
[INFO/PoolWorker-8] child process calling self.run()
[INFO/PoolWorker-10] child process calling self.run()
[DEBUG/MainProcess] closing pool
[SUBDEBUG/MainProcess] finalizer calling <bound method type._terminate_pool of <class 'multiprocessing.pool.Pool'>> with args (<Queue.Queue instance at 0x34af918>, <multiprocessing.queues.SimpleQueue object at 0x3494950>, <multiprocessing.queues.SimpleQueue object at 0x34a61b0>, [<Process(PoolWorker-1, started daemon)>, <Process(PoolWorker-2, started daemon)>, <Process(PoolWorker-3, started daemon)>, <Process(PoolWorker-4, started daemon)>, <Process(PoolWorker-5, started daemon)>, <Process(PoolWorker-6, started daemon)>, <Process(PoolWorker-7, started daemon)>, <Process(PoolWorker-8, started daemon)>, <Process(PoolWorker-9, started daemon)>, <Process(PoolWorker-10, started daemon)>], <Thread(Thread-1, started daemon -1341648896)>, <Thread(Thread-2, started daemon -1341116416)>, {}) and kwargs {}
[DEBUG/MainProcess] finalizing pool
[DEBUG/MainProcess] helping task handler/workers to finish
[DEBUG/MainProcess] removing tasks from inqueue until task handler finished
[DEBUG/MainProcess] task handler got sentinel
[DEBUG/MainProcess] task handler sending sentinel to result handler
[DEBUG/MainProcess] task handler sending sentinel to workers
[DEBUG/MainProcess] task handler exiting
[DEBUG/MainProcess] result handler got sentinel
[DEBUG/MainProcess] ensuring that outqueue is not full
[DEBUG/MainProcess] result handler exiting: len(cache)=0, thread._state=0
[DEBUG/PoolWorker-2] worker got sentinel -- exiting
[DEBUG/PoolWorker-1] worker got sentinel -- exiting
[INFO/PoolWorker-2] process shutting down
[DEBUG/PoolWorker-7] worker got sentinel -- exiting
[INFO/PoolWorker-1] process shutting down
[INFO/PoolWorker-7] process shutting down
[DEBUG/PoolWorker-7] running all "atexit" finalizers with priority >= 0
[DEBUG/PoolWorker-1] running all "atexit" finalizers with priority >= 0
[DEBUG/PoolWorker-7] running the remaining "atexit" finalizers
[INFO/PoolWorker-7] process exiting with exitcode 0
[DEBUG/PoolWorker-1] running the remaining "atexit" finalizers
[INFO/PoolWorker-1] process exiting with exitcode 0
[DEBUG/PoolWorker-5] worker got sentinel -- exiting
[DEBUG/PoolWorker-2] running all "atexit" finalizers with priority >= 0
[INFO/PoolWorker-5] process shutting down
[DEBUG/PoolWorker-5] running all "atexit" finalizers with priority >= 0
[DEBUG/PoolWorker-2] running the remaining "atexit" finalizers
[DEBUG/PoolWorker-5] running the remaining "atexit" finalizers
[INFO/PoolWorker-2] process exiting with exitcode 0
[INFO/PoolWorker-5] process exiting with exitcode 0
[DEBUG/PoolWorker-6] worker got sentinel -- exiting
[INFO/PoolWorker-6] process shutting down
[DEBUG/PoolWorker-6] running all "atexit" finalizers with priority >= 0
[DEBUG/PoolWorker-6] running the remaining "atexit" finalizers
[INFO/PoolWorker-6] process exiting with exitcode 0
[DEBUG/PoolWorker-4] worker got sentinel -- exiting
[DEBUG/PoolWorker-9] worker got sentinel -- exiting
[INFO/PoolWorker-9] process shutting down
[DEBUG/PoolWorker-9] running all "atexit" finalizers with priority >= 0
[DEBUG/PoolWorker-9] running the remaining "atexit" finalizers
[INFO/PoolWorker-9] process exiting with exitcode 0
[INFO/PoolWorker-4] process shutting down
[DEBUG/PoolWorker-4] running all "atexit" finalizers with priority >= 0
[DEBUG/PoolWorker-4] running the remaining "atexit" finalizers
[INFO/PoolWorker-4] process exiting with exitcode 0
[DEBUG/PoolWorker-10] worker got sentinel -- exiting
[INFO/PoolWorker-10] process shutting down
[DEBUG/PoolWorker-10] running all "atexit" finalizers with priority >= 0
[DEBUG/PoolWorker-10] running the remaining "atexit" finalizers
[INFO/PoolWorker-10] process exiting with exitcode 0
[DEBUG/PoolWorker-8] worker got sentinel -- exiting
[INFO/PoolWorker-8] process shutting down
[DEBUG/PoolWorker-8] running all "atexit" finalizers with priority >= 0
[DEBUG/PoolWorker-8] running the remaining "atexit" finalizers
[INFO/PoolWorker-8] process exiting with exitcode 0
[DEBUG/PoolWorker-3] worker got sentinel -- exiting
[INFO/PoolWorker-3] process shutting down
[DEBUG/PoolWorker-3] running all "atexit" finalizers with priority >= 0
[DEBUG/PoolWorker-3] running the remaining "atexit" finalizers
[INFO/PoolWorker-3] process exiting with exitcode 0
[DEBUG/MainProcess] terminating workers
[DEBUG/MainProcess] joining task handler
[DEBUG/MainProcess] joining result handler
[DEBUG/MainProcess] joining pool workers

当它不是我得到的只是:

[DEBUG/MainProcess] created semlock with handle 6
[DEBUG/MainProcess] created semlock with handle 7
[DEBUG/MainProcess] created semlock with handle 10
[DEBUG/MainProcess] created semlock with handle 11
[INFO/PoolWorker-1] child process calling self.run()
[INFO/PoolWorker-2] child process calling self.run()
[INFO/PoolWorker-3] child process calling self.run()
[INFO/PoolWorker-8] child process calling self.run()
[INFO/PoolWorker-5] child process calling self.run()
[INFO/PoolWorker-4] child process calling self.run()
[INFO/PoolWorker-9] child process calling self.run()
[INFO/PoolWorker-6] child process calling self.run()
[INFO/PoolWorker-7] child process calling self.run()
[INFO/PoolWorker-10] child process calling self.run()
4

2 回答 2

1

Not a complete solution, but I found a way to make the code work in either guise: from the interpreter or as code in a running script. I think the problem has to do with the following note in the multiprocessing docs:

Functionality within this package requires that the main method be importable by the children. This is covered in Programming guidelines however it is worth pointing out here. This means that some examples, such as the multiprocessing.Pool examples will not work in the interactive interpreter.

I'm not sure why this limitation exists, and why I can still sometimes use a pool from the interactive interpreter and sometimes not, but oh well....

to get around it I do the following in any module that might use multiprocessing:

import __main__
__SHOULD_MULTITHREAD__ = False
if hasattr(__main__,'__file__'):
    __SHOULD_MULTITHREAD__ = True

the rest of the code within that module can then check this flag to see if it should use a pool or just execute without parallelization. Doing this, I can still use and test parallelized functions in modules from the interactive interpreter, they just run much more slowly.

于 2011-01-27T20:39:56.653 回答
0

如果我没记错的话,test2.py 不应该是这样的吗

from test1 import findAllMyPaths
allmypaths = findAllMyPaths

进而

from test2 import allmypaths  
allmypaths()
于 2011-01-25T22:18:00.770 回答