3

我对 python 相当陌生,正在制作一个脚本,允许将点云数据从其他程序带入 Autodesk Maya。我的脚本运行良好,但我想做的是让它更快。我有一个 for 循环,它遍历编号文件的列表。即datafile001.txt、datafile002.txt等。我想知道是否有办法让它一次做一个以上的事情,可能使用线程或队列?下面是我一直在处理的代码:

     def threadedFuntion(args):
         if len(sourceFiles) > 3:
             for count, item in enumerate(sourceFiles):
                     t1=Thread(target=convertPcToPdc,args=(sourceFiles[filenumber1], particlesName, startframe, endframe, pdcIncrements, outputDirectory, variableFolder, acceptableArrayforms, dataType))
                     t1.start()
                     t2=Thread(target=convertPcToPdc,args=(sourceFiles[filenumber2], particlesName, startframe, endframe, pdcIncrements, outputDirectory, variableFolder, acceptableArrayforms, dataType))
                     t2.start()
                     t3=Thread(target=convertPcToPdc,args=(sourceFiles[filenumber3], particlesName, startframe, endframe, pdcIncrements, outputDirectory, variableFolder, acceptableArrayforms, dataType))
                     t3.start()
                     t4=Thread(target=convertPcToPdc,args=(sourceFiles[filenumber4], particlesName, startframe, endframe, pdcIncrements, outputDirectory, variableFolder, acceptableArrayforms, dataType))
                     t4.start()

由于多种原因,这显然不起作用,首先它只会创建 4 个线程,我希望能够提供或多或少的选项。第二个错误是因为它试图重用一个线程?就像我说的那样,我对 python 很陌生,而且有点过头了,我已经在这里阅读了几篇文章,但无法让一篇文章完全正确地工作。我认为队列可能是我需要但无法弄清楚的东西,我尝试了条件语句和连接语句,但再次无法得到我想要的。

我想更具体地说,我想要实现的是该函数正在读取文本文件,检索坐标,然后将它们导出为二进制文件以供 Maya 读取。这些文本文件中的一个通常具有 5-1000 万个 x、y、z 坐标,这需要相当长的时间。在一台非常糟糕的计算机上完成一个文件大约需要 30 分钟到 1 小时,任务管理器说 python 只使用 12% 的处理器和大约 1% 的内存,所以如果我可以一次执行多个这些,它会做这 100 个或更多文件的速度要快得多。我认为多线程/排队 for 循环并不难,但我已经迷失了大约一周的时间并尝试了失败的解决方案。

谢谢大家的帮助,我真的很感激,并认为这个网站很棒。这是我的第一篇文章,但我觉得我只是通过阅读这里完全学会了 python。

4

2 回答 2

1

子类 threading.Thread 并将您的工作函数作为 run() 的一部分放在该类中。

import threading
import time
import random

class Worker(threading.Thread):
    def __init__(self, srcfile, printlock,**kwargs):
        super(Worker,self).__init__(**kwargs)
        self.srcfile = srcfile
        self.lock = printlock # so threads don't step on each other's prints

    def run(self):
        with self.lock:
            print("starting %s on %s" % (self.ident,self.srcfile))
        # do whatever you need to, return when done
        # example, sleep for a random interval up to 10 seconds
        time.sleep(random.random()*10)
        with self.lock:
            print("%s done" % self.ident)


def threadme(srcfiles):
    printlock = threading.Lock()
    threadpool = []
    for file in srcfiles:
        threadpool.append(Worker(file,printlock))

    for thr in threadpool:
        thr.start()

    # this loop will block until all threads are done
    # (however it won't necessarily first join those that are done first)
    for thr in threadpool:
        thr.join()

    print("all threads are done")

if __name__ == "__main__":
    threadme(["abc","def","ghi"])

根据要求,要限制线程数,请使用以下命令:

def threadme(infiles,threadlimit=None,timeout=0.01):
    assert threadlimit is None or threadlimit > 0, \
           "need at least one thread";
    printlock = threading.Lock()
    srcfiles = list(infiles)
    threadpool = []

    # keep going while work to do or being done
    while srcfiles or threadpool:

        # while there's room, remove source files
        # and add to the pool
        while srcfiles and \
           (threadlimit is None \
            or len(threadpool) < threadlimit):
            file = srcfiles.pop()
            wrkr = Worker(file,printlock)
            wrkr.start()
            threadpool.append(wrkr)

        # remove completed threads from the pool
        for thr in threadpool:
            thr.join(timeout=timeout)
            if not thr.is_alive():
                threadpool.remove(thr)

    print("all threads are done")

if __name__ == "__main__":
    for lim in (1,2,3,4):
        print("--- Running with thread limit %i ---" % lim)
        threadme(("abc","def","ghi"),threadlimit=lim)

请注意,这实际上将反向处理源(由于列表 pop())。如果您要求它们按顺序完成,请在某处反转列表,或使用 deque 和 popleft()。

于 2012-10-13T01:25:39.093 回答
0

我建议为此使用mrjob

Mr Job 是map reduce的 python 实现。

下面是对大量文本文件进行多线程字数统计的 mr 作业代码:

from mrjob.job import MRJob

class MRWordCounter(MRJob):
    def get_words(self, key, line):
        for word in line.split():
            yield word, 1

    def sum_words(self, word, occurrences):
        yield word, sum(occurrences)

    def steps(self):
        return [self.mr(self.get_words, self.sum_words),]

if __name__ == '__main__':
    MRWordCounter.run()

此代码并行映射所有文件(计算每个文件的字数),然后将各种计数减少为一个总字数。

于 2012-10-13T00:56:39.327 回答