我正在尝试使用 Python 读取和修改多个文件的每一行。每个文件都有数千到数十万行,因此每个文件只有在处理后才会处理另一个文件。我正在尝试阅读以下文件:
csvReader = csv.reader(open("file","r")
for row in csvReader:
handleRow(row)
我想使用多线程来使用不同的线程并行读取每个文件以节省时间。谁能指出它是否有用以及如何实现它?
我正在尝试使用 Python 读取和修改多个文件的每一行。每个文件都有数千到数十万行,因此每个文件只有在处理后才会处理另一个文件。我正在尝试阅读以下文件:
csvReader = csv.reader(open("file","r")
for row in csvReader:
handleRow(row)
我想使用多线程来使用不同的线程并行读取每个文件以节省时间。谁能指出它是否有用以及如何实现它?
它可能有用也可能没用——如果所有文件都在同一个驱动器上,并且您已经尽可能快地推动驱动器,那么多路复用只会减慢速度。但是如果你没有最大化你的 I/O,它会加快速度。
至于怎么做,那是微不足道的。将您的代码包装在一个采用路径名的函数中,然后使用 concurrent.futures.ThreadPoolExecutor 或 multiprocessing.dummy.Pool ,这是将您的函数映射到整个可迭代路径名的一行代码:
with ThreadPoolExecutor(4) as executor:
executor.map(func, paths)
还有一件事:如果你不能最大化 I/O 的原因是因为你在每一行上做了太多的 CPU 工作,线程在 Python 中将无济于事(因为 GIL),但你可以使用进程——完全相同的代码,但使用了 ProcessorPoolExecutor。
可能您的瓶颈是 I/O,所以多线程无济于事;无论如何,很容易尝试:以下代码通过对每一行应用给定的字符串函数并将新文件写入给定路径来详细说明当前目录中的所有文件,一个文件线程。
from threading import Thread
from os import listdir
from os.path import basename, join, isfile
class FileChanger(Thread):
def __init__(self, sourcefilename, rowfunc, tgpath):
Thread.__init__(self)
self.rowfunc = rowfunc
self.sfname = sourcefilename
self.tgpath = tgpath
def run(self):
tgf = open(join(self.tgpath, basename(self.sfname)), 'w')
for r in open(self.sfname):
tgf.write(self.rowfunc(r))
tgf.close()
# main #
workers = [FileChanger(f, str.upper, '/tmp/tg') \
for f in listdir('.') if isfile(f)]
for w in workers:
w.start()
for w in workers:
w.join()