6

我在 Python 3 中生成具有超时的异步子进程时遇到问题。

我想要实现的目标:我想在不等待结果的情况下异步生成多个进程,但我也希望确保每个生成的进程都将在给定的超时时间内结束。

我在这里发现了类似的问题:Using module 'subprocess' with timeout and Asynchronous background processes in Python? 但他们没有解决我的问题。

我的代码看起来像这样。我有命令类,如Using module 'subprocess' with timeout中所建议的那样:

class Command(object):
  def __init__(self, cmd):
    self.cmd = cmd
    self.process = None

  def run(self, timeout):
    def target():
      print('Thread started')
      args = shlex.split(self.cmd)
      self.process = subprocess.Popen(args, shell=True)
      self.process.communicate()
      print('Thread finished')

    thread = threading.Thread(target=target)
    thread.start()

    thread.join(timeout)
    if thread.is_alive():
      print('Terminating process')
      self.process.terminate()
      thread.join()

然后当我想产生子进程时:

for system in systems:
  for service in to_spawn_system_info:
    command_str = "cd {0} && python proc_ip.py {1} {2} 0 2>>{3}".format(home_dir,
        service, system, service_log_dir)
    command = Command(command_str)
    command.run(timeout=60)

当我运行它时,输出似乎等待每个命令产生并结束。我明白了

Thread started
Thread finished
Thread started
Thread finished
Thread started
Thread finished
Thread started
Thread finished

所以我的问题是我做错了什么?现在我开始怀疑是否有可能产生一个进程并通过超时限制它的执行。

为什么我需要这个?spawner 脚本将在 cron 中运行。它将每 10 分钟执行一次,它必须产生大约 20 个子进程。我想保证每个子进程都会在脚本从 cron 再次运行之前结束。

4

3 回答 3

2

如前所述,对 process.communicate() 的调用使您的代码等待子流程的完成。但是,如果您只是删除了communicate() 调用,线程将在生成进程后立即退出,从而导致您的thread.join() 调用过早退出,并且您将过早地终止子进程。要在不轮询或忙于等待的情况下做你想做的事,你可以设置一个计时器,如果进程尚未完成,它将在超时后终止进程(和运行线程):

class Command(object):
  def __init__(self, cmd):
    self.cmd = cmd
    self.process = None

  def run(self, timeout):
    def target():
      print('Thread started')
      # May want/need to skip the shlex.split() when using shell=True
      # See Popen() constructor docs on 'shell' argument for more detail.
      args = shlex.split(self.cmd)
      self.process = subprocess.Popen(args, shell=True)
      self.timer.start()
      self.process.wait()
      self.timer.cancel()

    def timer_callback():
        print('Terminating process (timed out)')
        self.process.terminate()

    thread = threading.Thread(target=target)
    self.timer = threading.Timer(timeout, timer_callback)
    thread.start()
于 2013-05-14T13:01:02.690 回答
1

使用彼此独立开始和结束的线程。如果您提前知道要运行的所有命令,此方法将很有用。这是一个例子......

from threading import Thread
import subprocess
import Queue
import multiprocessing

class Command(object):
    def __init__(self, cmds):
        self.cmds = cmds

    def run_cmds(self):
        cmd_queue = Queue.Queue()
        for cmd in self.cmds:
            cmd_queue.put(cmd)

        available_threads = multiprocessing.cpu_count()
        for x in range(0,available_threads):
            t = Thread(target=self.run_cmd,args=(cmd_queue,))
            t.setDaemon(True)
            t.start()

        cmd_queue.join()


    def run_cmd(self, cmd_queue):
        while True:
            try: cmd = cmd_queue.get()
            except: break
            print 'Thread started'
            process = subprocess.Popen(cmd, shell=True)
            process.communicate()
            print 'Thread finished'
            cmd_queue.task_done()


# create list of commands you want to run
cmds = ['cd /home/nater/Desktop','cd /home/nater/Desktop','cd /home/nater/Desktop','cd /home/nater/Desktop','cd /home/nater/Desktop']
# create class
c = Command(cmds)
# run them...
c.run_cmds()

这将打印....

Thread started
Thread started
 Thread started
 Thread startedThread finished

Thread started
Thread finishedThread finished

Thread finished
Thread finished

正如您从输出中看到的那样,子进程彼此独立地开始和结束,并且没有子进程等待另一个子进程完成,因为它们都在不同的线程中调用。当然,您可以添加超时和其他任何您想要的,这只是一个简单的示例。这假设您知道要运行的所有命令。如果您想添加线程超时,请参阅 epicbrews 答案。如果您愿意,您可以将他的线程超时示例合并到这个示例中。

于 2013-05-14T12:16:50.313 回答
0
from threading import *
from time import time
import shlex
import subprocess
from random import randint
class Worker(Thread):
    def __init__(self, param, cmd, timeout=10):
        self.cmd = cmd
        self.timeout = timeout

        Thread.__init__(self)
        self.name = param
    def run(self):
        startup = time()
        print(self.name + ' is starting')

        args = shlex.split(self.cmd)
        #Shell should be false when given a list (True for strings)
        process = subprocess.Popen(args, shell=False, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE)

        while time()-startup <= self.timeout:
            if process.poll() != None:
                break

        process.stdout.close()
        process.stdin.close()
        process.stderr.close()

        print(self.name + ' is dead')

for i in range(0, 100):
    x = Worker('Name-'+str(i), 'ping -n ' + str(randint(0,5)) + ' www.google.se')
    x.start()

while len(enumerate()) > 1:
    pass # Wait for the threads to die

这能简化你的工作方法吗?特别是考虑到您不需要等待结果,这只会将一个类对象发射到外层空间为您执行工作,超时为 c。

另请注意:

  • 不关闭标准输出、标准输入和标准错误将导致几乎所有系统上的“打开多个文件句柄”
  • 正如在另一个答案中指出的那样, .communicate() 等待进程退出(.poll()改为使用)
于 2013-05-14T11:55:23.393 回答