我编写了一个 python 函数,它使用提供的目录模式搜索文件系统,并在每个级别提供可选的“操作”。然后我尝试对它进行多线程处理,因为一些卷位于网络共享上,我想尽量减少 IO 阻塞。我从使用多处理 Pool 类开始,因为这是最方便的......(说真的,没有用于线程的 Pool 类?)我的函数尽可能地解开提供的 FS 模式并将新返回的路径提交给池,直到没有新路径被退回。直接使用函数和类时,我让它工作得很好,但现在我试图从另一个类中使用这个函数,我的程序似乎挂起。为了简化,我使用线程而不是进程重写了函数,甚至写了一个简单的线程池类......同样的问题。这里'
file test1.py:
------------------------------------------------
import os
import glob
from multiprocessing import Pool
def mapGlob(pool,paths,pattern):
results = []
paths = [os.path.join(p,pattern) for p in paths]
for result in pool.map(glob.glob,paths):
results += result
return results
def findAllMyPaths():
pool = Pool(10)
paths = ['/Volumes']
follow = ['**','ptid_*','expid_*','slkid_*']
for pattern in follow:
paths = mapGlob(pool,paths,pattern)
return paths
file test2.py:
----------------------------------------------------------------------------
from test1 import findAllMyPaths
allmypaths = findAllMyPaths()
现在如果我打电话
>>>from test1 import findAllMyPaths
>>>findAllMyPaths()
>>>...long list of all the paths
这工作正常,但如果尝试:
>>>from test2 import allmypaths
python永远挂起。调用了动作函数(在本例中为 glob),但它们似乎永远不会返回......我需要帮助......并行化版本在正常工作时运行得更快(取决于什么“动作”,速度快 6-20 倍)被映射到 FS 树中的每个点),所以我希望能够使用它。
如果我将映射函数更改为非并行版本:
def mapGlob(pool,paths,pattern):
results = []
paths = [os.path.join(p,pattern) for p in paths]
for path in paths:
results += glob.glob(path)
return results
一切运行良好。
编辑:
我打开了多处理中的调试,看看这是否可以进一步帮助我。在它有效的情况下,我得到:
[DEBUG/MainProcess] created semlock with handle 5
[DEBUG/MainProcess] created semlock with handle 6
[DEBUG/MainProcess] created semlock with handle 9
[DEBUG/MainProcess] created semlock with handle 10
[INFO/PoolWorker-1] child process calling self.run()
[INFO/PoolWorker-2] child process calling self.run()
[INFO/PoolWorker-3] child process calling self.run()
[INFO/PoolWorker-5] child process calling self.run()
[INFO/PoolWorker-4] child process calling self.run()
[INFO/PoolWorker-6] child process calling self.run()
[INFO/PoolWorker-7] child process calling self.run()
[INFO/PoolWorker-9] child process calling self.run()
[INFO/PoolWorker-8] child process calling self.run()
[INFO/PoolWorker-10] child process calling self.run()
[DEBUG/MainProcess] closing pool
[SUBDEBUG/MainProcess] finalizer calling <bound method type._terminate_pool of <class 'multiprocessing.pool.Pool'>> with args (<Queue.Queue instance at 0x34af918>, <multiprocessing.queues.SimpleQueue object at 0x3494950>, <multiprocessing.queues.SimpleQueue object at 0x34a61b0>, [<Process(PoolWorker-1, started daemon)>, <Process(PoolWorker-2, started daemon)>, <Process(PoolWorker-3, started daemon)>, <Process(PoolWorker-4, started daemon)>, <Process(PoolWorker-5, started daemon)>, <Process(PoolWorker-6, started daemon)>, <Process(PoolWorker-7, started daemon)>, <Process(PoolWorker-8, started daemon)>, <Process(PoolWorker-9, started daemon)>, <Process(PoolWorker-10, started daemon)>], <Thread(Thread-1, started daemon -1341648896)>, <Thread(Thread-2, started daemon -1341116416)>, {}) and kwargs {}
[DEBUG/MainProcess] finalizing pool
[DEBUG/MainProcess] helping task handler/workers to finish
[DEBUG/MainProcess] removing tasks from inqueue until task handler finished
[DEBUG/MainProcess] task handler got sentinel
[DEBUG/MainProcess] task handler sending sentinel to result handler
[DEBUG/MainProcess] task handler sending sentinel to workers
[DEBUG/MainProcess] task handler exiting
[DEBUG/MainProcess] result handler got sentinel
[DEBUG/MainProcess] ensuring that outqueue is not full
[DEBUG/MainProcess] result handler exiting: len(cache)=0, thread._state=0
[DEBUG/PoolWorker-2] worker got sentinel -- exiting
[DEBUG/PoolWorker-1] worker got sentinel -- exiting
[INFO/PoolWorker-2] process shutting down
[DEBUG/PoolWorker-7] worker got sentinel -- exiting
[INFO/PoolWorker-1] process shutting down
[INFO/PoolWorker-7] process shutting down
[DEBUG/PoolWorker-7] running all "atexit" finalizers with priority >= 0
[DEBUG/PoolWorker-1] running all "atexit" finalizers with priority >= 0
[DEBUG/PoolWorker-7] running the remaining "atexit" finalizers
[INFO/PoolWorker-7] process exiting with exitcode 0
[DEBUG/PoolWorker-1] running the remaining "atexit" finalizers
[INFO/PoolWorker-1] process exiting with exitcode 0
[DEBUG/PoolWorker-5] worker got sentinel -- exiting
[DEBUG/PoolWorker-2] running all "atexit" finalizers with priority >= 0
[INFO/PoolWorker-5] process shutting down
[DEBUG/PoolWorker-5] running all "atexit" finalizers with priority >= 0
[DEBUG/PoolWorker-2] running the remaining "atexit" finalizers
[DEBUG/PoolWorker-5] running the remaining "atexit" finalizers
[INFO/PoolWorker-2] process exiting with exitcode 0
[INFO/PoolWorker-5] process exiting with exitcode 0
[DEBUG/PoolWorker-6] worker got sentinel -- exiting
[INFO/PoolWorker-6] process shutting down
[DEBUG/PoolWorker-6] running all "atexit" finalizers with priority >= 0
[DEBUG/PoolWorker-6] running the remaining "atexit" finalizers
[INFO/PoolWorker-6] process exiting with exitcode 0
[DEBUG/PoolWorker-4] worker got sentinel -- exiting
[DEBUG/PoolWorker-9] worker got sentinel -- exiting
[INFO/PoolWorker-9] process shutting down
[DEBUG/PoolWorker-9] running all "atexit" finalizers with priority >= 0
[DEBUG/PoolWorker-9] running the remaining "atexit" finalizers
[INFO/PoolWorker-9] process exiting with exitcode 0
[INFO/PoolWorker-4] process shutting down
[DEBUG/PoolWorker-4] running all "atexit" finalizers with priority >= 0
[DEBUG/PoolWorker-4] running the remaining "atexit" finalizers
[INFO/PoolWorker-4] process exiting with exitcode 0
[DEBUG/PoolWorker-10] worker got sentinel -- exiting
[INFO/PoolWorker-10] process shutting down
[DEBUG/PoolWorker-10] running all "atexit" finalizers with priority >= 0
[DEBUG/PoolWorker-10] running the remaining "atexit" finalizers
[INFO/PoolWorker-10] process exiting with exitcode 0
[DEBUG/PoolWorker-8] worker got sentinel -- exiting
[INFO/PoolWorker-8] process shutting down
[DEBUG/PoolWorker-8] running all "atexit" finalizers with priority >= 0
[DEBUG/PoolWorker-8] running the remaining "atexit" finalizers
[INFO/PoolWorker-8] process exiting with exitcode 0
[DEBUG/PoolWorker-3] worker got sentinel -- exiting
[INFO/PoolWorker-3] process shutting down
[DEBUG/PoolWorker-3] running all "atexit" finalizers with priority >= 0
[DEBUG/PoolWorker-3] running the remaining "atexit" finalizers
[INFO/PoolWorker-3] process exiting with exitcode 0
[DEBUG/MainProcess] terminating workers
[DEBUG/MainProcess] joining task handler
[DEBUG/MainProcess] joining result handler
[DEBUG/MainProcess] joining pool workers
当它不是我得到的只是:
[DEBUG/MainProcess] created semlock with handle 6
[DEBUG/MainProcess] created semlock with handle 7
[DEBUG/MainProcess] created semlock with handle 10
[DEBUG/MainProcess] created semlock with handle 11
[INFO/PoolWorker-1] child process calling self.run()
[INFO/PoolWorker-2] child process calling self.run()
[INFO/PoolWorker-3] child process calling self.run()
[INFO/PoolWorker-8] child process calling self.run()
[INFO/PoolWorker-5] child process calling self.run()
[INFO/PoolWorker-4] child process calling self.run()
[INFO/PoolWorker-9] child process calling self.run()
[INFO/PoolWorker-6] child process calling self.run()
[INFO/PoolWorker-7] child process calling self.run()
[INFO/PoolWorker-10] child process calling self.run()