17

我是期货模块的新手,并且有一项可以从并行化中受益的任务;但我似乎无法准确地弄清楚如何为线程设置函数和为进程设置函数。我将不胜感激任何人都可以就此事提供帮助。

我正在运行粒子群优化 (PSO)。无需过多介绍 PSO 本身,以下是我的代码的基本布局:

有一个Particle类,有一个getFitness(self)方法(计算一些度量并将其存储在 中self.fitness)。PSO 模拟有多个粒子实例(很容易超过 10 个;对于某些模拟,有 100 个甚至 1000 个)。
每隔一段时间,我就必须计算粒子的适应度。目前,我在 for 循环中执行此操作:

for p in listOfParticles:
  p.getFitness(args)

但是,我注意到每个粒子的适应度可以相互独立地计算。这使得这种适应度计算成为并行化的主要候选者。确实,我可以做到map(lambda p: p.getFitness(args), listOfParticles)

现在,我可以很容易地做到这一点futures.ProcessPoolExecutor

with futures.ProcessPoolExecutor() as e:
  e.map(lambda p: p.getFitness(args), listOfParticles)

由于调用的副作用p.getFitness存储在每个粒子本身中,我不必担心从futures.ProcessPoolExecutor().

到目前为止,一切都很好。但是现在我注意到它ProcessPoolExecutor会创建新进程,这意味着它会复制内存,这很慢。我希望能够共享内存 - 所以我应该使用线程。这很好,直到我意识到在每个进程中运行多个线程并运行多个进程可能会更快,因为多个线程仍然只在我可爱的 ​​8 核机器的一个处理器上运行。

这是我遇到麻烦的地方:
根据我看到的示例,ThreadPoolExecutorlist. 也是如此ProcessPoolExecutor。所以我不能做任何迭代的事情ProcessPoolExecutorThreadPoolExecutor因为那样ThreadPoolExecutor会得到一个单一的对象来处理(见我的尝试,贴在下面)。
另一方面,我不能对listOfParticles自己进行切片,因为我想ThreadPoolExecutor发挥自己的魔力来弄清楚需要多少线程。

所以,一个大问题(终于)
我应该如何构建我的代码,以便我可以使用进程和线程有效地并行化以下内容:

for p in listOfParticles:
  p.getFitness()

这是我一直在尝试的,但我不敢尝试运行它,因为我知道它不会工作:

>>> def threadize(func, L, mw):
...     with futures.ThreadpoolExecutor(max_workers=mw) as executor:
...             for i in L:
...                     executor.submit(func, i)
... 

>>> def processize(func, L, mw):
...     with futures.ProcessPoolExecutor() as executor:
...             executor.map(lambda i: threadize(func, i, mw), L)
...

我会很感激任何关于如何解决这个问题的想法,甚至是关于如何改进我的方法

万一这很重要,我在 python3.3.2

4

3 回答 3

19

我将为您提供将进程与线程混合以解决问题的工作代码,但这不是您所期望的;-) 首先是制作一个不会危及您的真实数据的模拟程序。尝试一些无害的东西。所以这是开始:

class Particle:
    def __init__(self, i):
        self.i = i
        self.fitness = None
    def getfitness(self):
        self.fitness = 2 * self.i

现在我们有一些东西可以玩了。接下来是一些常量:

MAX_PROCESSES = 3
MAX_THREADS = 2 # per process
CHUNKSIZE = 100

摆弄那些来品尝。 CHUNKSIZE稍后会解释。

第一个惊喜是我的最低级工作函数的作用。那是因为你在这里过于乐观了:

由于调用 p.getFitness 的副作用存储在每个粒子本身中,因此我不必担心从 futures.ProcessPoolExecutor() 获得返回。

唉,工作进程中所做的任何Particle事情都不会对主程序中的实例产生任何影响。工作进程处理实例的副本Particle无论是通过写时复制实现,还是因为它正在处理通过对跨进程传递的泡菜fork()进行解封而制成的副本。Particle

所以如果你想让你的主程序看到健身结果,你需要安排将信息发送回主程序。因为我对您的实际程序知之甚少,所以在这里我假设它Particle().i是一个唯一的整数,并且主程序可以轻松地将整数映射回Particle实例。考虑到这一点,这里最底层的工作函数需要返回一对:唯一整数和适应度结果:

def thread_worker(p):
    p.getfitness()
    return (p.i, p.fitness)

Particle鉴于此,跨线程传播 s 列表并返回结果列表很容易(particle_id, fitness)

def proc_worker(ps):
    import concurrent.futures as cf
    with cf.ThreadPoolExecutor(max_workers=MAX_THREADS) as e:
        result = list(e.map(thread_worker, ps))
    return result

笔记:

  1. 这是每个工作进程将运行的功能。
  2. 我使用的是 Python 3,所以使用list()强制e.map()实现列表中的所有结果。
  3. 正如评论中提到的,在 CPython 下,跨线程分发 CPU 绑定任务比在单个线程中完成所有任务要慢。

只剩下编写代码来Particle跨进程传播 s 列表并检索结果。这很容易做到multiprocessing,所以这就是我要使用的。我不知道是否concurrent.futures可以做到(鉴于我们也在线程中混合),但不在乎。但是因为我给你工作代码,你可以玩它并报告;-)

if __name__ == "__main__":
    import multiprocessing

    particles = [Particle(i) for i in range(100000)]
    # Note the code below relies on that particles[i].i == i
    assert all(particles[i].i == i for i in range(len(particles)))

    pool = multiprocessing.Pool(MAX_PROCESSES)
    for result_list in pool.imap_unordered(proc_worker,
                      (particles[i: i+CHUNKSIZE]
                       for i in range(0, len(particles), CHUNKSIZE))):
        for i, fitness in result_list:
            particles[i].fitness = fitness

    pool.close()
    pool.join()

    assert all(p.fitness == 2*p.i for p in particles)

笔记:

  1. 我正在Particle“手动”将 s 列表分成块。这CHUNKSIZE就是为了。那是因为一个工作进程想要一个 s 的列表Particle工作,而这又是因为这就是futures map()函数想要的。无论如何,将工作分块是一个好主意,因此您可以获得一些真正的收益,以换取每次调用的进程间开销。
  2. imap_unordered()不保证返回结果的顺序。这给了实现更多的自由来尽可能有效地安排工作。而且我们不关心这里的顺序,所以没关系。
  3. 请注意,循环检索(particle_id, fitness)结果,并Particle相应地修改实例。也许您的真实.getfitness对实例进行了其他突变Particle-无法猜测。无论如何,主程序永远不会看到工人“通过魔法”产生的任何突变 - 你必须明确安排。在限制中,您可以(particle_id, particle_instance)改为返回对,并替换Particle主程序中的实例。然后它们会反映工作进程中发生的所有突变。

玩得开心 :-)

期货一路下跌

事实证明它很容易更换multiprocessing。以下是变化。这也(如前所述)替换了原始Particle实例,以便捕获所有突变。但是,这里有一个折衷:腌制一个实例比腌制单个“适合度”结果需要“更多”字节。更多的网络流量。选择你的毒药;-)

返回变异的实例只需要替换 的最后一行thread_worker(),如下所示:

return (p.i, p)

然后用这个替换所有的“ main ”块:

def update_fitness():
    import concurrent.futures as cf
    with cf.ProcessPoolExecutor(max_workers=MAX_PROCESSES) as e:
        for result_list in e.map(proc_worker,
                      (particles[i: i+CHUNKSIZE]
                       for i in range(0, len(particles), CHUNKSIZE))):
            for i, p in result_list:
                particles[i] = p

if __name__ == "__main__":
    particles = [Particle(i) for i in range(500000)]
    assert all(particles[i].i == i for i in range(len(particles)))

    update_fitness()

    assert all(particles[i].i == i for i in range(len(particles)))
    assert all(p.fitness == 2*p.i for p in particles)

multiprocessor代码与舞蹈非常相似。就个人而言,我会使用该multiprocessing版本,因为imap_unordered它很有价值。这是简化界面的一个问题:他们经常以隐藏有用的可能性为代价来购买简单性。

于 2013-11-25T01:01:58.923 回答
4

首先,您确定在为所有内核加载进程的同时利用运行多线程吗?如果它是 cpu-bound,几乎没有。至少必须进行一些测试。

如果添加线程会影响您的性能,那么下一个问题是是否可以通过手动负载平衡或自动负载平衡来获得更好的性能。我所说的手工是指仔细地将工作负载划分为具有相似计算复杂性的块,并为每个块设置一个新的任务处理器,这是您最初但值得怀疑的解决方案。通过自动创建进程/线程池并在工作队列上为您争取的新任务进行通信。在我看来,第一种方法是 Apache Hadoop 范式之一,第二种方法是由工作队列处理器实现,例如 Celery。第一种方法可能会遇到一些任务块变慢并运行而另一些完成,第二种方法增加了通信和等待任务的开销,这是要进行的性能测试的第二点。

最后,如果您希望拥有一个包含多线程的静态进程集合,AFAIK,您无法concurrent.futures按原样实现它,并且必须对其进行一些修改。我不知道是否有针对此任务的现有解决方案,但作为concurrent纯 python 解决方案(没有 C 代码),它可以轻松完成。工作处理器是在类的_adjust_process_count 例程中定义的ProcessPoolExecutor,并且使用多线程方法对其进行子类化和覆盖是相当简单的,您只需要提供您的自定义_process_worker,基于concurrent.features.thread

原文ProcessPoolExecutor._adjust_process_count供参考:

def _adjust_process_count(self):
    for _ in range(len(self._processes), self._max_workers):
        p = multiprocessing.Process(
                target=_process_worker,
                args=(self._call_queue,
                      self._result_queue))
        p.start()
        self._processes[p.pid] = p
于 2013-11-15T06:56:01.157 回答
1

这是一个通用的答案,它利用实现的threadedprocessThreadedProcesPoolExecutor,允许在进程池中组合使用线程池。下面是一个使用它的通用实用程序函数:

import concurrent.futures
import logging
from typing import Callable, Iterable, Optional

import threadedprocess

log = logging.getLogger(__name__)


def concurrently_execute(fn: Callable, fn_args: Iterable, max_processes: Optional[int] = None, max_threads_per_process: Optional[int] = None) -> None:
    """Execute the given callable concurrently using multiple threads and/or processes."""
    # Ref: https://stackoverflow.com/a/57999709/
    if max_processes == 1:
        executor = concurrent.futures.ThreadPoolExecutor(max_workers=max_threads_per_process)
    elif max_threads_per_process == 1:
        executor = concurrent.futures.ProcessPoolExecutor(max_workers=max_processes)  # type: ignore
    else:
        executor = threadedprocess.ThreadedProcessPoolExecutor(max_processes=max_processes, max_threads=max_threads_per_process)

    if max_processes and max_threads_per_process:
        max_workers = max_processes * max_threads_per_process
        log.info("Using %s with %s processes and %s threads per process, i.e. with %s workers.", executor.__class__.__name__, max_processes, max_threads_per_process, max_workers)

    with executor:
        futures = [executor.submit(fn, *fn_args_cur) for fn_args_cur in fn_args]

    for future in concurrent.futures.as_completed(futures):
        future.result()  # Raises exception if it occurred in process worker.
于 2019-09-18T19:36:21.763 回答