甚至线程对于目录遍历也很有帮助。我使用以下代码遍历 SharePoint 树,大约 50 个线程获得了相当显着的加速。这个特定的程序为目录结构中的所有 xml 文件返回(路径、数据)对,并且可以简单地扩展以供您使用。(这是从我的程序中剪切和粘贴的;需要进行一些额外的编辑。)
#unique string for error passing error messages
ERROR = '\xffERROR\xff'
class ScanWorker(threading.Thread):
"""Worker class for scanning directory structures.
pathQueue: queue for pathnames of directories
resultQueue: results of processFile, pairs of (path, data) to be updated
"""
lock = threading.Lock()
dirCount = 0
def __init__(self, pathQueue, resultQueue):
self.pathQueue = pathQueue
self.resultQueue = resultQueue
super().__init__()
def run(self):
"""Worker thread.
Get a directory, process it, and put new directories on the
queue."""
try:
while True:
self.processDir(self.pathQueue.get())
self.pathQueue.task_done()
except Exception as e:
#pass on exception to main thread
description = traceback.format_exception(*sys.exc_info())
description.insert(0,
"Error in thread {}:\n".format(
threading.current_thread().name))
self.resultQueue.put((ERROR, description))
self.pathQueue.task_done()
def processDir(self, top):
"""Visit a directory
Call self.processFile on every file, and queue the directories.
"""
#Wait and retry a few times in case of network errors.
#SharePoint is not reliable, gives errors for no reason
for retryCount in range(30):
try:
names = listdir(top)
break
except OSError as e:
if e.errno in (2,22):
lastError = e
print(end="L", flush=True)
time.sleep(1)
else:
raise
else:
print("List: too many retries")
raise lastError
#it is not important to worry about race conditions here
self.__class__.dirCount += 1
#process contents
for name in names:
if isdir(join(top, name)): self.pathQueue.put(join(top, name))
else: self.processFile(join(top, name))
def processFile(self, path):
"""Get XML file.
"""
#only xml files
if not path.lower().endswith('.xml'): return
filemtime = datetime.fromtimestamp(getmtime(path))
#SharePoint is not reliable, gives errors for no reason; just retry
for retryCount in range(30):
try:
data = open(path,'rb').read()
break
except OSError as e:
if e.errno in (2,22):
lastError = e
print(end="R", flush=True)
time.sleep(1)
else:
raise
else:
print("Read: too many retries")
raise lastError
self.resultQueue.put((path, data))
class Scanner:
"""Interface to the ScanWorkers
Sharepoint is pretty fast compared to its delay and handles 50 workers well
Make sure you only create one instance of Scanner!
"""
def __init__(self, workers):
#don't restrict the path queue length; this causes deadlock
#we use a LIFO queue to get more depth-first like search
#reducing average queue length and hopefully improving server caching
self.pathQueue = queue.LifoQueue()
#this is the output queue to the main thread
self.resultQueue = queue.Queue(5)
self.workers = workers
#start workers
for i in range(workers):
t = ScanWorker(self.pathQueue, self.resultQueue)
t.setDaemon(True)
t.start()
def startWorkers(self, path):
#add counter
self.added = 0
#and go
self.pathQueue.put(path)
def processResult(self, wait=True):
"""Get an element from the result queue, and add to the zip file."""
path, data = self.resultQueue.get(block=wait)
if path==ERROR:
#process gave alarm; stop scanning
#pass on description
raise ScanError(data)
<do whatever you want to do with the file>
self.resultQueue.task_done()
self.added += 1
#main
try:
#set up
scanner = Scanner(threads)
scanner.startWorkers(rootpath)
pathQueue, resultQueue = scanner.pathQueue, scanner.resultQueue
#scanner is rolling; wait for it to finish
with pathQueue.all_tasks_done:
while pathQueue.unfinished_tasks:
#tasks are still running
#process results
while True:
try: scanner.processResult(wait=False)
except queue.Empty: break
#no new files found; check if scanner is ready
done = pathQueue.all_tasks_done.wait(timeout=1)
if not done:
#Not yet; print something while we wait
print(
"\rProcessed {} files from {} directories [{} {}] "
.format(
scanner.added,
ScanWorker.dirCount,
pathQueue.unfinished_tasks,
resultQueue.unfinished_tasks,
), end='\r')
#just to make sure everybody is ready: join the path queue
pathQueue.join()
#process remaining of result queue
while resultQueue.unfinished_tasks: scanner.processResult(wait=True)
#go to new line to prevent overwriting progress messages
print()
except ScanError as e:
print()
print(*e.args[0], end='')
print("Process interrupted.")
except KeyboardInterrupt:
print("\nProcess interrupted.")
print()