11

我想使用一个进程池运行多个作业并应用给定的超时,之后应该杀死一个作业并由另一个正在处理下一个任务的作业替换。

我尝试使用multiprocessing提供异步运行工人池的方法的模块(例如使用map_async),但在那里我只能设置一个“全局”超时,之后所有进程都将被杀死。

是否有可能有一个单独的超时,之后只有一个耗时太长的进程被杀死,而是再次将一个新的工作人员添加到池中(处理下一个任务并跳过超时的那个)?

这是一个简单的例子来说明我的问题:

def Check(n):
  import time
  if n % 2 == 0: # select some (arbitrary) subset of processes
    print "%d timeout" % n
    while 1:
      # loop forever to simulate some process getting stuck
      pass
  print "%d done" % n
  return 0

from multiprocessing import Pool
pool = Pool(processes=4)
result = pool.map_async(Check, range(10))
print result.get(timeout=1)    

超时后,所有工作人员都被杀死,程序退出。相反,我希望它继续下一个子任务。我必须自己实施这种行为还是有现有的解决方案?

更新

可以杀死悬挂的工人,他们会被自动替换。所以我想出了这个代码:

jobs = pool.map_async(Check, range(10))
while 1:
  try:
    print "Waiting for result"
    result = jobs.get(timeout=1)
    break # all clear
  except multiprocessing.TimeoutError: 
    # kill all processes
    for c in multiprocessing.active_children():
      c.terminate()
print result

现在的问题是循环永远不会退出;即使在处理完所有任务后,调用也会get产生超时异常。

4

3 回答 3

10

为解决这些类型的问题而构建了pebble Pool 模块。它支持给定任务的超时,允许检测它们并轻松恢复。

from pebble import ProcessPool
from concurrent.futures import TimeoutError

with ProcessPool() as pool:
    future = pool.schedule(function, args=[1,2], timeout=5)

try:
    result = future.result()
except TimeoutError:
    print "Function took longer than %d seconds" % error.args[1]

对于您的具体示例:

from pebble import ProcessPool
from concurrent.futures import TimeoutError

results = []

with ProcessPool(max_workers=4) as pool:
    future = pool.map(Check, range(10), timeout=5)

    iterator = future.result()

    # iterate over all results, if a computation timed out
    # print it and continue to the next result
    while True:
        try:
            result = next(iterator)
            results.append(result)
        except StopIteration:
            break  
        except TimeoutError as error:
            print "function took longer than %d seconds" % error.args[1] 

print results
于 2015-07-02T13:16:07.263 回答
3

目前,Python 不提供本地方法来控制工作程序本身之外的池中每个不同任务的执行时间。
因此,简单的方法是wait_procspsutil模块中使用并将任务实现为子进程。
如果不需要非标准库,那么您必须在主进程中具有工作周期的poll()进程模块的基础上实现自己的池, - 执行每个工作人员并执行所需的操作。

至于更新的问题,如果您直接终止其中一名工作人员,则池会损坏(这是解释器实现中的错误,因为不应允许这种行为):重新创建工作人员,但任务丢失并且池变得不可连接。您必须终止所有池,然后为其他任务重新创建它:

from multiprocessing import Pool
while True:
    pool = Pool(processes=4)
    jobs = pool.map_async(Check, range(10))
    print "Waiting for result"
    try:
        result = jobs.get(timeout=1)
        break # all clear
    except multiprocessing.TimeoutError: 
        # kill all processes
        pool.terminate()
        pool.join()
print result    

更新

Pebble是一个优秀且方便的库,它解决了这个问题。Pebble是为 Python 函数的异步执行而设计的,而PyExPool是为模块和外部可执行文件的异步执行而设计的,尽管两者可以互换使用。

另一个方面是,当 3dparty 依赖项不受欢迎时,PyExPool可能是一个不错的选择,它是多进程执行池的单文件轻量级实现,具有每个作业和全局超时、将作业分组到任务和其他功能的机会.
PyExPool可以嵌入到您的源代码中并进行定制,具有宽松的 Apache 2.0 许可证和生产质量,用于一个高负载科学基准测试框架的核心。

于 2015-05-05T13:22:45.390 回答
0

尝试将每个进程与单独线程上的超时连接的构造。所以主程序永远不会卡住,如果卡住了,也会因为超时而被杀死。这种技术是线程和多处理模块的组合。

这是我在内存中保持最小 x 线程数的方法。它是线程和多处理模块的组合。对于其他技术,如受人尊敬的成员在上面解释过,这可能是不寻常的,但可能非常值得。为了解释起见,我假设一次抓取至少 5 个网站。

所以这里是:-

#importing dependencies.
from multiprocessing import Process
from threading import Thread
import threading

# Crawler function
def crawler(domain):
    # define crawler technique here.
    output.write(scrapeddata + "\n")
    pass

接下来是threadController函数。该函数将控制线程流向主内存。它将继续激活线程以维持 threadNum“最小”限制,即。5.它也不会退出,直到所有活动线程(acitveCount)都完成。

它将保持最少的 threadNum(5) startProcess 函数线程(这些线程最终会从 processList 启动进程,同时在 60 秒内加入它们)。启动 threadController 后,将有 2 个线程不包括在上述 5 个限制中,即。Main 线程和 threadController 线程本身。这就是为什么使用 threading.activeCount() != 2 的原因。

def threadController():
    print "Thread count before child thread starts is:-", threading.activeCount(), len(processList)
    # staring first thread. This will make the activeCount=3
    Thread(target = startProcess).start()
    # loop while thread List is not empty OR active threads have not finished up.
    while len(processList) != 0 or threading.activeCount() != 2:
        if (threading.activeCount() < (threadNum + 2) and # if count of active threads are less than the Minimum AND
            len(processList) != 0):                            # processList is not empty
                Thread(target = startProcess).start()         # This line would start startThreads function as a seperate thread **

startProcess 函数作为一个单独的线程,将从进程列表中启动进程。这个函数的目的(**作为一个不同的线程开始)是它将成为进程的父线程。因此,当它将以 60 秒的超时时间加入它们时,这将停止 startProcess 线程继续前进,但这不会停止 threadController 执行。因此,这样,threadController 将按要求工作。

def startProcess():
    pr = processList.pop(0)
    pr.start()
    pr.join(60.00) # joining the thread with time out of 60 seconds as a float.

if __name__ == '__main__':
    # a file holding a list of domains
    domains = open("Domains.txt", "r").read().split("\n")
    output = open("test.txt", "a")
    processList = [] # thread list
    threadNum = 5 # number of thread initiated processes to be run at one time

    # making process List
    for r in range(0, len(domains), 1):
        domain = domains[r].strip()
        p = Process(target = crawler, args = (domain,))
        processList.append(p) # making a list of performer threads.

    # starting the threadController as a seperate thread.
    mt = Thread(target = threadController)
    mt.start()
    mt.join() # won't let go next until threadController thread finishes.

    output.close()
    print "Done"

除了在内存中保持最少数量的线程外,我的目标是还有一些东西可以避免内存中的线程或进程卡住。我使用超时功能做到了这一点。对于任何打字错误,我深表歉意。

我希望这个建筑能帮助这个世界上的任何人。

问候,

维卡斯·高塔姆

于 2015-09-06T16:46:05.157 回答