13

我有一小部分工人 (4) 和一个非常大的任务列表 (5000~)。我正在使用一个池并使用 map_async() 发送任务。因为我正在运行的任务相当长,所以我强制将块大小设置为 1,以便一个长进程无法容纳一些较短的进程。

我想做的是定期检查还有多少任务要提交。我知道最多有 4 个处于活动状态,我关心还有多少需要处理。

我用谷歌搜索,找不到任何人这样做。

一些简单的代码来帮助:

import multiprocessing
import time

def mytask(num):
    print('Started task, sleeping %s' % num)
    time.sleep(num)

pool = multiprocessing.Pool(4)
jobs = pool.map_async(mytask, [1,2,3,4,5,3,2,3,4,5,2,3,2,3,4,5,6,4], chunksize=1)
pool.close()

while True:
    if not jobs.ready():
        print("We're not done yet, %s tasks to go!" % <somethingtogettasks>)
        jobs.wait(2)
    else:
        break
4

4 回答 4

9

看起来jobs._number_left就是你想要的。_表示它是一个内部值,可能会随开发人员的心血来潮而改变,但它似乎是获取该信息的唯一方法。

于 2011-04-04T20:10:13.120 回答
1

没有我所知道的密封方式,但是如果您使用该Pool.imap_unordered()函数而不是 map_async,则可以拦截已处理的元素。

import multiprocessing
import time

process_count = 4

def mytask(num):
    print('Started task, sleeping %s' % num)
    time.sleep(num)
    # Actually, you should return the job you've created here.
    return num

pool = multiprocess.Pool(process_count)
jobs  = []
items = [1,2,3,4,5,3,2,3,4,5,2,3,2,3,4,5,6,4]
job_count = 0
for job in pool.imap_unordered(mytask, items):
    jobs.append(job)
    job_count += 1

    incomplete = len(items) - job_count
    unsubmitted = max(0, incomplete - process_count)

    print "Jobs incomplete: %s. Unsubmitted: %s" % incomplete, unsubmitted

pool.close()

我正在减去process_count,因为您几乎可以假设所有进程都将处理以下两个例外之一:1)如果您使用迭代器,则可能没有更多的项目要消耗和处理,以及 2)您可能有更少剩下 4 件以上。我没有为第一个异常编写代码。但是,如果您需要,这样做应该很容易。无论如何,你的例子使用了一个列表,所以你不应该有这个问题。

编辑:我还意识到您正在使用 While 循环,这使您看起来像是在尝试定期更新某些内容,例如每半秒或某事。我作为示例给出的代码不会那样做。我不确定这是否有问题。

于 2011-04-04T19:14:19.850 回答
1

Pool._cache假设您正在使用 ,您可以通过查看属性来检查待处理作业的数量apply_async。这是在ApplyResult它们可用之前存储的位置,并且等于ApplyResult待处理的 s 数量。

import multiprocessing as mp
import random
import time


def job():
    time.sleep(random.randint(1,10))
    print("job finished")

if __name__ == '__main__':
    pool = mp.Pool(5)
    for _ in range(10):
        pool.apply_async(job)

    while pool._cache:
        print("number of jobs pending: ", len(pool._cache))
        time.sleep(2)

    pool.close()
    pool.join()
于 2018-08-24T05:42:05.023 回答
1

我有类似的要求:跟踪进度,根据结果执行临时工作,在任意时间干净地停止所有处理。我的处理方式是一次发送一个任务apply_async。我所做的一个高度简化的版本:

maxProcesses = 4
q = multiprocessing.Queue()
pool = multiprocessing.Pool()
runlist = range(100000)
sendcounter = 0
donecounter = 0
while donecounter < len(runlist):
    if stopNowBooleanFunc():  # if for whatever reason I want to stop processing early
        if donecounter == sendcounter:  # wait til already sent tasks finish running
            break
    else:  # don't send new tasks if it's time to stop
        while sendcounter < len(runlist) and sendcounter - donecounter < maxProcesses:
            pool.apply_async(mytask, (runlist[sendcounter], q))
            sendcounter += 1

    while not q.empty():  # process completed results as they arrive
        aresult = q.get()
        processResults(aresult)
        donecounter += 1

请注意,我使用 aQueue而不是return结果。

于 2015-08-20T09:07:14.983 回答