2

我正在尝试创建多个文件,这些文件将使用独立程序进行分析,作为用 python 编写的高吞吐量分析的一部分。

for foo in X:
    write foo_file
    os.system(run_program foo_file)

对于 15,000 个不同的单个文件,如果我可以在多个内核上运行它们,这将运行得更快,但我不想淹没我的服务器。如何设置多个线程以在 os 中运行,但同时设置最大线程数?我不担心生成过程的速度,因为运行时是由我所在领域的外部程序标准定义的。

我查看了有关线程和多处理的文档并不知所措。

4

2 回答 2

4

限制产生的进程总数的一种简单方法是使用多处理池

演示多处理池的简单示例是:

测试.py

from multiprocessing.pool import Pool
# @NOTE: The two imports below are for demo purposes and won't be necessary in
# your final program
import random
import time

def writeOut(index):
    """ A function which prints a start message, delays for a random interval and then
        prints a finish message
    """
    delay = random.randint(1,5)                                                                                                                                             
    print("Starting process #{0}".format(index))
    time.sleep(delay)
    print("Finished process #{0} which delayed for {1}s.".format(index, delay))

# Create a process pool with a maximum of 10 worker processes
pool = Pool(processes=10)
# Map our function to a data set - number 1 through 20
pool.map(writeOut, range(20))

这应该会给你一个类似的输出:

[mike@tester ~]$ python test.py 
Starting process #0
Starting process #2
Starting process #3
Starting process #1
Starting process #4
Starting process #5
Starting process #6
Starting process #7
Starting process #8
Starting process #9
Finished process #2 which delayed for 1s.
Starting process #10
Finished process #7 which delayed for 1s.
Finished process #6 which delayed for 1s.
Starting process #11
Starting process #12
Finished process #9 which delayed for 2s.
Finished process #12 which delayed for 1s.
Starting process #13
Starting process #14
Finished process #1 which delayed for 3s.
Finished process #5 which delayed for 3s.
Starting process #15
Starting process #16
Finished process #8 which delayed for 3s.
Starting process #17
Finished process #4 which delayed for 4s.
Starting process #18
Finished process #10 which delayed for 3s.
Finished process #13 which delayed for 2s.
Starting process #19
Finished process #0 which delayed for 5s.
Finished process #3 which delayed for 5s.
Finished process #11 which delayed for 4s.
Finished process #15 which delayed for 2s.
Finished process #16 which delayed for 2s.
Finished process #18 which delayed for 2s.
Finished process #14 which delayed for 4s.
Finished process #17 which delayed for 5s.
Finished process #19 which delayed for 5s.

如您所见,前十个进程启动,然后每个后续进程仅在另一个进程池工作程序完成(变得可用)后才开始。使用多个进程(而不是多个线程)绕过了全局解释器锁 (GIL)

要使此示例代码与您的任务一起使用,您需要编写一个文件输出函数并将其传递给要写入的文件数据的可迭代对象,以pool.map()代替writeOutand range(20)

于 2013-01-30T07:19:37.400 回答
1

尝试这个:

class ThreadWriteFile(threading.Thread):
    def __init__(self, queue_to_write, queue_to_run):
        threading.Thread.__init__(self)
        self.queue_to_write = queue_to_write
        self.queue_to_run = queue_to_run

    def run(self):
        while True:
            foo_file = self.queue_to_write.get()
            write foo_file
            self.queue_to_run.put(foo_file)
            self.queue_to_write.task_done()

class ThreadRunProgram(threading.Thread):
    def __init__(self, queue_to_run):
        threading.Thread.__init__(self)
        self.queue_to_run = queue_to_run

    def run(self):
        while True:
            foo_file = self.queue_to_run.get()
            os.system(run_program foo_file)
            self.queue_to_run.task_done()

queue_to_write = Queue.Queue()
queue_to_run = Queue.Queue()

for foo in X:
    twf = ThreadWriteFile(queue_to_write, queue_to_run)
    twf.daemon = True
    twf.start()
    queue_to_write.put(foo)

    trf = ThreadRunProgram(queue_to_run)
    trf.daemon = True
    trf.start()

queue_to_write.join()
queue_to_run.join()
于 2013-01-30T06:58:31.250 回答