0

我正在尝试用 Python 编写一种任务管理器。它基于一个作业队列,主线程负责将作业添加到该队列中。我已经让这个类来处理排队的作业,能够限制并发进程的数量并处理完成进程的输出。

问题来了,_check_jobs 函数我没有更新每个进程的返回码值,独立于它的状态(正在运行,完成......)job.returncode 总是无,因此我无法运行 if 语句并删除处理作业列表中的作业。

我知道可以使用 process.communicate() 或 process.wait() 来完成,但我不想阻止启动进程的线程。有没有其他方法可以做到这一点,也许使用ProcessPoolExecutor?队列可以随时被进程击中,我需要能够处理它们。

谢谢大家的时间和支持:)

from queue import Queue 
import subprocess
from threading import Thread
from time import sleep


class JobQueueManager(Queue):
    def __init__(self, maxsize: int):
        super().__init__(maxsize)
        self.processing_jobs = []
        self.process = None
        self.jobs_launcher=Thread(target=self._worker_job)
        self.processing_jobs_checker=Thread(target=self._check_jobs_status)
        self.jobs_launcher.start()
        self.processing_jobs_checker.start()


    def _worker_job(self):
        while True:
            # Run at max 3 jobs concurrently
            if self.not_empty and len(self.processing_jobs) < 3:
                # Get job from queue
                job = self.get()
                # Execute a task without blocking the thread
                self.process = subprocess.Popen(job)
                self.processing_jobs.append(self.process)
                # util if  queue.join() is used to block the queue
                self.task_done()
            else:
                print("Waiting 4s for jobs") 
                sleep(4)

    def _check_jobs_status(self):
        while True:
            # Check if jobs are finished
            for job in self.processing_jobs:
                # Sucessfully completed
                if job.returncode == 0:
                    self.processing_jobs.remove(job)
            # Wait 4 seconds and repeat
            sleep(4)


def main():

    q = JobQueueManager(100)
    task = ["stress", "--cpu", "1", "--timeout", "20"]

    for i in range(10): #put 10 tasks in the queue
        q.put(task)

    q.join() #block until all tasks are done


if __name__ == "__main__":
    main()
4

1 回答 1

0

我回答自己,我想出了一个可行的解决方案。JobExecutor 类以自定义方式处理进程池。watch_completed_tasks 函数尝试在任务完成时观察和处理任务的输出。这样,所有事情都只用两个线程完成,并且在提交进程时主线程不会被阻塞。

import subprocess
from threading import Timer
from concurrent.futures import ProcessPoolExecutor, as_completed
import logging


def launch_job(job):
    process = subprocess.Popen(job, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    print(f"launching {process.pid}")
    return [process.pid, process.stdout.read(), process.stderr.read()]


class JobExecutor(ProcessPoolExecutor):
    def __init__(self, max_workers: int):
        super().__init__(max_workers)
        self.futures = []
        self.watch_completed_tasks()

    def submit(self, command):
        future = super().submit(launch_job, command)
        self.futures.append(future)
        return future

    def watch_completed_tasks(self):
        # Manage tasks completion
        for completed_task in as_completed(self.futures):
            print(f"FINISHED  task with PID {completed_task.result()[0]}")
            self.futures.remove(completed_task)

        # call this function evevery 5 seconds
        timer_thread = Timer(5.0, self.watch_completed_tasks)
        timer_thread.setName("TasksWatcher")
        timer_thread.start()

def main():

    executor = JobExecutor(max_workers=5)
    for i in range(10):
        task = ["stress",
        "--cpu", "1",
        "--timeout", str(i+5)]
        executor.submit(task)
于 2022-02-26T16:48:26.210 回答