1

如何使多处理器系统工作,从而在列表中生成新作业?我不断得到:

断言 self._popen 是无,'不能启动一个进程两次' AttributeError:'Worker' 对象没有属性 '_popen'

这是有道理的,因为我基本上是在制作同一项工作的多个实例……那我该如何解决呢?我需要设置多处理器池吗?

让我知道是否需要进一步澄清。

这是我的多处理类:

class Worker(multiprocessing.Process):

    def __init__(self, output_path, source, file_name):
        self.output_path = output_path
        self.source = source
        self.file_name = file_name

    def run(self):

        t = HTML(self.source)
        output = open(self.output_path+self.file_name+'.html','w')
        word_out = open(self.output_path+self.file_name+'.txt','w')  
        try:
            output.write(t.tokenized)

            for w in word_list:
                if w:
                    word_out.write(w+'\n')

            word_out.close()
            output.close()
            word_list = []

        except IndexError: 
            output.write(s[1])
            output.close()
            word_out.close()

        except UnboundLocalError:
            output.write(s[1])
            output.close()
            word_out.close()   

这是实现整个事情的类。

class implement(HTML):

    def __init__(self, input_path, output_path):
        self.input_path = input_path
        self.output_path = output_path

    def ensure_dir(self, directory):
        if not os.path.exists(directory):
            os.makedirs(directory)
        return directory    

    def prosses_epubs(self):
        for root, dirs, files in os.walk(self.input_path+"\\"):
            epubs = [root+file for file in files if file.endswith('.epub')]
            output_file = [self.ensure_dir(self.output_path+"\\"+os.path.splitext(os.path.basename(e))[0]+'_output\\') for e in epubs]

        count = 0 
        for e in epubs:
            epub = epubLoader(e)

            jobs = []

            # this is what's breaking everything right here. I'm not sure how to fix it. 
            for output_epub in epub.get_html_from_epub():                
                worker = Worker(output_file[count], output_epub[1], output_epub[0])
                jobs.append(worker)
                worker.start()

            for j in jobs:
                j.join()

            count += 1
        print "done!"


if __name__ == '__main__':
    test = implement('some local directory', 'some local directory')    
    test.prosses_epubs()

对此的任何帮助将不胜感激。也让我知道我在代码中所做的事情是否可以做得更好......我一直在努力学习如何以最好的方式做事。

4

1 回答 1

3
  • 当功能足够时不要使用类。在这种情况下,您的每个类本质上都有一个丰富的方法,并且该__init__方法只是保存在多肉方法中使用的参数。您可以通过将多肉方法作为函数并将参数直接传递给它来使您的代码更流畅。
  • 将“工作”(即任务)的概念与“工人”(即流程)的概念分开。您的机器的处理器数量有限,但作业数量可能要多得多。您不想为每个作业打开一个新进程,因为这可能会淹没您的 CPU——本质上是对自己进行分叉轰炸。
  • 使用该with语句来保证您的文件句柄被关闭。我在三个不同的地方看到output.close()word_out.close()被叫到。您可以使用 -statement 消除所有这些行,with一旦 Python 离开 -suite,它将自动关闭这些文件句柄with
  • 我认为多处理池可以很好地与您的代码配合使用。可以使用 将作业发送给池中的工作人员pool.apply_async。每个调用排队一个作业,该作业将等待池中的工作人员可以处理它。pool.join() 导致主进程等待,直到所有作业完成。
  • 使用os.path.join而不是使用'\\'. 这将使您的代码与非 Windows 机器兼容。
  • 使用枚举而不是手动实现/递增计数器。它减少了打字,并使您的代码更具可读性。

以下代码将不会运行,因为epubLoader,HTMLword_list未定义,但它可以让您更清楚地了解我上面的建议:

import multiprocessing as mp

def worker(output_path, source, filename):
    t = HTML(source)
    output_path = output_path+filename
    output = open(output_path+'.html', 'w')
    word_out = open(output_path+'.txt','w')
    with output, word_out:
        try:
            output.write(t.tokenized)

            for w in word_list:
                if w:
                    word_out.write(w+'\n')

            word_list = []

        except IndexError: 
            output.write(s[1])

        except UnboundLocalError:
            output.write(s[1])


def ensure_dir(directory):
    if not os.path.exists(directory):
        os.makedirs(directory)
    return directory    


def process_epubs(input_path, output_path):
    pool = mp.Pool()

    for root, dirs, files in os.walk(input_path):
        epubs = [os.path.join(root, file) for file in files
                 if file.endswith('.epub')]
        output_file = [
            ensure_dir(
                os.path.join(
                    output_path,
                    os.path.splitext(os.path.basename(e))[0] + '_output')
                for e in epubs)]

    for count, e in enumerate(epubs):
        epub = epubLoader(e)
        for filename, source in epub.get_html_from_epub():
            pool.apply_async(
                worker,
                args=(output_file[count], source, filename))
    pool.close()
    pool.join()

    print "done!"


if __name__ == '__main__':
    process_epubs('some local directory', 'some local directory')
于 2013-07-12T18:19:28.233 回答