我必须循环运行 200 个文件的程序。
现在我让它们像这样运行:
for combo in it.combinations(files, 2):
cmd = ["command", combo[0], combo[1]]
subprocess.Popen(cmd)
我想一次只运行 60 个,以免使计算机不堪重负,该命令非常占用处理器资源。一旦有 60 个进程正在运行,那么暂停循环的最佳方法是什么,然后在一个进程完成后重新开始,以便始终有 60 个进程在运行?
我必须循环运行 200 个文件的程序。
现在我让它们像这样运行:
for combo in it.combinations(files, 2):
cmd = ["command", combo[0], combo[1]]
subprocess.Popen(cmd)
我想一次只运行 60 个,以免使计算机不堪重负,该命令非常占用处理器资源。一旦有 60 个进程正在运行,那么暂停循环的最佳方法是什么,然后在一个进程完成后重新开始,以便始终有 60 个进程在运行?
#!/usr/bin/env python
import itertools
import subprocess
from multiprocessing.dummy import Pool # use threads
def run(combo):
cmd = ["command", combo[0], combo[1]]
return combo, subprocess.call(cmd)
def main():
p = Pool(60) # 60 subprocesses at a time
for combo, rc in p.imap_unordered(run, itertools.combinations(files, 2)):
print("%s exited with %s" % (combo, rc))
p.close()
p.join()
if __name__ == "__main__":
main()
这个答案演示了限制并发子进程数量的各种技术:它显示了基于 multiprocessing.Pool、concurrent.futures、threading + Queue 的解决方案。
这可能会有所帮助:
import itertools as it
import time
import subprocess
files = range(5)
max_load = 3
sleep_interval = 0.5
pid_list = []
for combo in it.combinations(files, 2):
# Random command that takes time
cmd = ['sleep', str(combo[0]+combo[1])]
# Launch and record this command
print "Launching: ", cmd
pid = subprocess.Popen(cmd)
pid_list.append(pid)
# Deal with condtion of exceeding maximum load
while len(filter(lambda x: x.poll() is None, pid_list)) >= max_load:
time.sleep(sleep_interval)
你想要这样的东西:
import socket
import threading
import Queue
import subprocess
class IPThread(threading.Thread):
def __init__(self, queue, num):
super(IPThread, self).__init__()
self.queue = queue
self.num = num
def run(self):
while True:
try:
args = self.queue.get_nowait()
cmd = ["echo"] + [str(i) for i in args]
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = p.communicate()
print out
except Queue.Empty:
# Nothing left in the Queue -- we are done
print "Queue %d done" % self.num
break
except Exception as err:
# Handle exception
print err
self.queue.task_done()
def create_threads(q, size):
for i in range(size):
thread = IPThread(q, i)
thread.setDaemon(True)
thread.start()
q.join()
def fill_queue(q):
# Call q.put(args) in a loop to populate Queue with arguments
from itertools import permutations
x = list(range(20))
for arg1, arg2 in permutations(x, 2):
q.put([arg1, arg2])
print q.qsize()
def main():
q = Queue.Queue()
fill_queue(q)
create_threads(q, 60)
print "Done"
if __name__ == '__main__':
main()
创建要处理的事情的队列。专门化您的 Thread 派生类。旋转你的线程。等待他们完成。
您可以看出这些任务正在同时运行,因为它们的输出相互干扰。这是一个特点!
你可以做一些非常简单的事情,比如:
from time import sleep
count = 0
for combo in it.combinations(files, 2):
while count < 60:
cmd = ["command", combo[0], combo[1]]
subprocess.Popen(cmd)
count = count + 1
if subprocess_is_done:
count = count - 1
sleep(5)
显然,您需要弄清楚如何subprocess_is_done
从您的命令中获取信息。
据我所知,这适用于琐碎的案例,但不知道您要运行什么...