我正在尝试用 Python 编写一种任务管理器。它基于一个作业队列,主线程负责将作业添加到该队列中。我已经让这个类来处理排队的作业,能够限制并发进程的数量并处理完成进程的输出。
问题来了,_check_jobs 函数我没有更新每个进程的返回码值,独立于它的状态(正在运行,完成......)job.returncode 总是无,因此我无法运行 if 语句并删除处理作业列表中的作业。
我知道可以使用 process.communicate() 或 process.wait() 来完成,但我不想阻止启动进程的线程。有没有其他方法可以做到这一点,也许使用ProcessPoolExecutor?队列可以随时被进程击中,我需要能够处理它们。
谢谢大家的时间和支持:)
from queue import Queue
import subprocess
from threading import Thread
from time import sleep
class JobQueueManager(Queue):
def __init__(self, maxsize: int):
super().__init__(maxsize)
self.processing_jobs = []
self.process = None
self.jobs_launcher=Thread(target=self._worker_job)
self.processing_jobs_checker=Thread(target=self._check_jobs_status)
self.jobs_launcher.start()
self.processing_jobs_checker.start()
def _worker_job(self):
while True:
# Run at max 3 jobs concurrently
if self.not_empty and len(self.processing_jobs) < 3:
# Get job from queue
job = self.get()
# Execute a task without blocking the thread
self.process = subprocess.Popen(job)
self.processing_jobs.append(self.process)
# util if queue.join() is used to block the queue
self.task_done()
else:
print("Waiting 4s for jobs")
sleep(4)
def _check_jobs_status(self):
while True:
# Check if jobs are finished
for job in self.processing_jobs:
# Sucessfully completed
if job.returncode == 0:
self.processing_jobs.remove(job)
# Wait 4 seconds and repeat
sleep(4)
def main():
q = JobQueueManager(100)
task = ["stress", "--cpu", "1", "--timeout", "20"]
for i in range(10): #put 10 tasks in the queue
q.put(task)
q.join() #block until all tasks are done
if __name__ == "__main__":
main()