5

我有几个文件驻留在服务器上,我试图实现一个多线程进程来提高性能,我阅读了一个教程,但实现它的问题很少,

这是文件,

filelistread = ['h:\\file1.txt', \
                'h:\\file2.txt', \
                'h:\\file3.txt', \
                'h:\\file4.txt']

filelistwrte = ['h:\\file1-out.txt','h:\\file2-out.txt','h:\\file3-out.txt','h:\\file4-out.txt']


def workermethod(inpfile, outfile):
    f1 = open(inpfile,'r')
    f2 = open(outfile,'w')
    x = f1.readlines()
    for each in x:
        f2.write(each)
    f1.close()
    f2.close()

如何使用线程类和队列来实现?

我从下面的课程开始,但不确定如何将 inpfile 和 outputfile 传递给 run 方法。任何输入都表示赞赏

class ThreadUrl(threading.Thread):
    def __init__(self,queue):
        threading.Thread.__init__(self)
        self.queue = queue

    def run(self):
        while True:
            item = self.queue.get()
4

1 回答 1

29

您正在混合两种不同的解决方案。

如果您想为每个文件创建一个专用的工作线程,则不需要任何队列。如果要创建线程池和文件队列,则不希望inpfileand传递outfilerun方法;您想将它们放入队列中的每个作业中。

你如何在两者之间做出选择?嗯,第一个显然更简单,但是如果你有 1000 个文件要复制,你最终会创建 1000 个线程,这比你想要创建的线程多,而且线程数远远多于并行副本的数量操作系统将能够处理。一个线程池可以让你创建,比如说,8 个线程,并将 1000 个作业放在一个队列中,它们将被适当地分配给线程,因此一次运行 8 个作业。

让我们从解决方案 1 开始,每个文件都有一个专用的工作线程。

首先,如果您没有与 subclassing 结婚Thread,那么这里真的没有理由这样做。您可以将一个target函数和一个args元组传递给默认构造函数,然后该run方法将target(*args)完全按照您的意愿执行。所以:

t = threading.Thread(target=workermethod, args=(inpfile, outfile))

这就是你所需要的。当每个线程运行时,它会调用workermethod(inpfile, outfile)然后退出。

但是,如果您出于某种原因确实想要子类化,您可以。您可以在构建时Thread传递inpfileand ,您的方法将被修改为使用而不是获取参数。像这样:outfilerunworkermethodself.inpfileself.outfile

class ThreadUrl(threading.Thread):
    def __init__(self, inpfile, outfile):
        threading.Thread.__init__(self)
        self.inpfile, self.outfile = inpfile, outfile

    def run(self):
        f1 = open(self.inpfile,'r')
        f2 = open(self.outfile,'w')
        x = f1.readlines()
        for each in x:
            f2.write(each)
        f1.close()
        f2.close()

无论哪种方式,我都建议使用with语句而不是显式openand close,并摆脱readlines(不必要地将整个文件读入内存),除非您需要处理真正旧版本的 Python:

    def run(self):
        with open(self.inpfile,'r') as f1, open(self.outfile,'w') as f2:
            for line in f1:
                f2.write(line)

现在,解决方案 2:线程池和队列。

同样,您在这里不需要子类;两种做事方式之间的差异与解决方案 1 中的相同。但是坚持您已经开始的子类设计,您需要这样的东西:

class ThreadUrl(threading.Thread):
    def __init__(self,queue):
        threading.Thread.__init__(self)
        self.queue = queue

    def run(self):
        while True:
            inpfile, outfile = self.queue.get()
            workermethod(inpfile, outfile)

然后你通过将一个单一的传递queue给所有线程来启动你的线程:

q = queue.Queue
threadpool = [ThreadUrl(q) for i in range(poolsize)]

并提交这样的工作:

q.put((inpfile, outfile))

如果您要认真使用线程池,您可能希望考虑使用健壮、灵活、简单和优化的实现,而不是自己编写代码。例如,您可能希望能够取消作业、很好地关闭队列、加入整个池而不是一个一个地加入线程、进行批处理或智能负载平衡等。

如果您使用的是 Python 3,则应该查看标准库ThreadPoolExecutor。如果你被 Python 2 卡住了,或者不知道Futures,你可能想看看ThreadPool隐藏在multiprocessing模块中的类。两者都具有从多线程切换到多处理的优势(例如,如果您有一些需要与 IO 并行化的 CPU 密集型工作)是微不足道的。你也可以搜索 PyPI,你会发现许多其他好的实现。

作为旁注,您不想调用 queue queue,因为这会隐藏模块名称。workermethod此外,有一个名为实际上是一个自由函数而不是一个方法的东西有点令人困惑。

最后,如果您所做的只是复制文件,您可能不想在文本模式下阅读,或者逐行阅读。事实上,您可能根本不想自己实现它。只需使用shutil. 您可以使用上述任何一种方法非常轻松地做到这一点。例如,而不是这个:

t = threading.Thread(target=workermethod, args=(inpfile, outfile))

做这个:

t = threading.Thread(target=shutil.copyfile, args=(inpfile, outfile))

实际上,看起来您的整个程序可以替换为:

threads = [threading.Thread(target=shutil.copyfile, args=(inpfile, outfile))
           for (inpfile, outfile) in zip(filelistread, filelistwrte)]
for thread in threads:
    thread.start()
for thread in threads:
    thread.join()
于 2013-01-02T23:19:38.190 回答