0

我正在开发一个需要运行两个不同 CPU 密集型功能的项目。因此,使用多处理方法似乎是要走的路。我面临的挑战是一个函数的运行时间比另一个函数慢。为了争论,假设它execute的运行时间为 0.1 秒,而运行update时间为一整秒。目标是在update运行execute时将计算一个输出值 10 次。一旦update完成,它需要传递一组参数,execute然后可以继续使用新的参数集生成输出。一段时间后update需要再次运行并再次生成一组新参数。

此外,这两个函数都需要一组不同的输入变量。

下面的图片链接应该希望能更好地可视化我的难题。

函数运行时可视化

根据我收集到的信息(https://zetcode.com/python/multiprocessing/),使用非对称映射方法可能是可行的方法,但它似乎并没有真正起作用。任何帮助是极大的赞赏。

伪代码

from multiprocessing import Pool
from datetime import datetime
import time
import numpy as np


class MyClass():
    def __init__(self, inital_parameter_1, inital_parameter_2):
        self.parameter_1 = inital_parameter_1
        self.parameter_2 = inital_parameter_2

    def execute(self, input_1, input_2, time_in):
        print('starting execute function for time:' + str(time_in))
        time.sleep(0.1)  # wait for 100 milliseconds
        # generate some output
        output = (self.parameter_1 * input_1) + (self.parameter_2 + input_2)
        print('exiting execute function')
        return output

    def update(self, update_input_1, update_input_2, time_in):
        print('starting update function for time:' + str(time_in))
        time.sleep(1)  # wait for 1 second
        # generate parameters
        self.parameter_1 += update_input_1
        self.parameter_2 += update_input_2
        print('exiting update function')

    def smap(f):
        return f()


if __name__ == "__main__":
    update_input_1 = 3
    update_input_2 = 4
    input_1 = 0
    input_2 = 1
    # initialize class
    my_class = MyClass(1, 2)

    # total runtime (arbitrary)
    runtime = int(10e6)
    # update_time (arbitrary)
    update_time = np.array([10, 10e2, 15e4, 20e5])

    for current_time in range(runtime):
        # if time equals update time run both functions simultanously until update is complete
        if any(update_time == current_time):
            with Pool() as pool:
                res = pool.map_async(my_class.smap, [my_class.execute(input_1, input_2, current_time),
                                                     my_class.update(update_input_1, update_input_2, current_time)])
        # otherwise run only execute
        else:
            output = my_class.execute(input_1, input_2,current_time)
        
        # increment input 
        input_1 += 1
        input_2 += 2
4

1 回答 1

0

我承认无法完全遵循您的代码相对于您的描述。但我看到一些问题:

  1. 方法update没有返回除 之外的任何值None,由于缺少return语句而隐式返回。
  2. 您的with Pool() ...:块将terminate在块退出时调用,这是在您调用pool.map_async非阻塞之后立即调用的。但是你没有办法等待这个提交的任务完成(terminate很可能会在它完成之前杀死正在运行的任务)。
  3. 您传递给map_async函数的是工作函数名称和一个iterable。但是您正在从当前主进程调用方法调用,execute并将它们的返回值用作可迭代的元素,而这些返回值绝对不是适合传递给. 所以没有进行多处理,这完全是错误的。updatesmap
  4. 您还一遍又一遍地创建和销毁进程池。最好只创建一次进程池。

因此,我至少建议进行以下更改。但请注意,此代码生成任务的速度可能比完成它们的速度要快得多,并且根据您的当前值,您可能会有数百万个任务排队等待运行runtime,这可能会对系统资源(例如内存)造成很大压力。所以我插入了一些代码来确保提交任务的速率受到限制,这样未完成的提交任务的数量永远不会超过可用 CPU 内核数量的三倍。

# we won't need heavy-duty numpy for what we are doing:
#import numpy as np
from multiprocessing import cpu_count
from threading import Lock
... # etc.

if __name__ == "__main__":
    update_input_1 = 3
    update_input_2 = 4
    input_1 = 0
    input_2 = 1
    # initialize class
    my_class = MyClass(1, 2)

    # total runtime (arbitrary)
    runtime = int(10e6)
    # update_time (arbitrary)
    # we don't need overhead of numpy (remove import of numpy):
    #update_time = np.array([10, 10e2, 15e4, 20e5])
    update_time = [10, 10e2, 15e4, 20e5]

    tasks_submitted = 0
    lock = Lock()

    execute_output = []
    def execute_result(result):
        global tasks_submitted

        with lock:
            tasks_submitted -= 1
        # result is the return value from method execute
        # do something with it, e.g. execute_output.append(result)
        pass

    update_output = []
    def update_result(result):
        global tasks_submitted

        with lock:
            tasks_submitted -= 1
        # result is the return value from method update
        # do something with it, e.g. update_output.append(result)
        pass

    n_processors = cpu_count()
    with Pool() as pool:
        for current_time in range(runtime):
            # if time equals update time run both functions simultanously until update is complete
            #if any(update_time == current_time):
            if current_time in update_time:
                # run both update and execute:
                pool.apply_async(my_class.update, args=(update_input_1, update_input_2, current_time), callback=update_result)
                with lock:
                    tasks_submitted += 1
            pool.apply_async(my_class.execute, args=(input_1, input_2, current_time), callback=execute_result)
            with lock:
                tasks_submitted += 1

            # increment input
            input_1 += 1
            input_2 += 2
            while tasks_submitted > n_processors * 3:
                time.sleep(.05)
        # Ensure all tasks have completed:
        pool.close()
        pool.join()
        assert(tasks_submitted == 0)
于 2021-07-11T14:40:06.673 回答