2

因此,我知道标题中两种方法之间的区别,但不知道实际含义。

据我了解:如果您使用的 NUM_WORKERS 多于实际可用的内核,您将面临性能大幅下降,因为您的操作系统会不断地来回切换以保持并行。不知道这是多么真实,但我在这里从比我聪明的人那里读到它。

在它的文档中os.cpu_count()说:

返回系统中的 CPU 数量。如果未确定,则返回 None。这个数字不等于当前进程可以使用的 CPU 数量。可以使用 len(os.sched_getaffinity(0)) 获得可用 CPU 的数量

因此,我试图弄清楚“系统”指的是一个进程是否可以使用比“系统”中更多的 CPU。

我只想安全有效地实现multiprocessing.pool功能。所以这是我总结的问题:

有什么实际意义:

NUM_WORKERS = os.cpu_count() - 1
# vs.
NUM_WORKERS = len(os.sched_getaffinity(0)) - 1

-1是因为我发现如果我在处理数据时尝试工作,我的系统会少很多。

4

3 回答 3

4

这两个函数非常不同,并且NUM_WORKERS = os.sched_getaffinity(0) - 1会立即失败,TypeError因为您尝试从集合中减去一个整数。虽然os.cpu_count()告诉您系统有多少核心,但告诉您允许os.sched_getaffinity(pid)某个线程/进程在哪些核心上运行。


os.cpu_count()

os.cpu_count()显示操作系统已知的可用内核数(虚拟内核)。很可能你有一半的物理核心。如果使用比物理内核更多的进程甚至比虚拟内核更多的进程是有意义的,那么很大程度上取决于你在做什么。计算循环越紧密(指令的多样性很小,缓存未命中的次数很少,...),您就越有可能不会从更多使用的内核中受益(通过使用更多的工作进程),甚至会遇到性能下降。

显然,它还取决于您的系统正在运行的其他内容,因为您的系统会尝试为系统中的每个线程(作为进程的实际执行单元)分配公平的可用内核运行时间。因此,就应该使用多少工人而言,不可能一概而论。但是,例如,如果您有一个紧密的循环并且您的系统处于空闲状态,那么优化的一个很好的起点是

os.cpu_count() // 2 # same as mp.cpu_count() // 2 

...并从那里增加。

@Frank Yellin 已经提到如何multiprocessing.Pool使用os.cpu_count()工人数量作为默认值。

os.sched_getaffinity(pid)

os.sched_getaffinity(pid)

返回具有 PID pid 的进程(或当前进程,如果为零)被限制到的 CPU 集。

现在 core/cpu/processor/-affinity 是关于允许您的线程(在您的工作进程内)运行的具体(虚拟)核心。您的操作系统为每个核心提供一个 id,从 0 到 (number-of-cores - 1) 并且更改亲和力允许限制(“固定”)某个线程允许在哪些实际核心上运行。

至少在 Linux 上,我发现这意味着如果当前没有可用的允许内核,则子进程的线程将不会运行,即使其他不允许的内核空闲也是如此。所以“亲和力”在这里有点误导。

摆弄亲和性的目标是最大程度地减少上下文切换和核心迁移造成的缓存失效。您的操作系统通常具有更好的洞察力,并且已经尝试通过其调度策略保持缓存“热”,因此除非您知道自己在做什么,否则您不能期望从干扰中轻松获得收益。

默认情况下,亲和力设置为所有核心和 for multiprocessing.Pool,至少在您的系统处于空闲状态时,更改它并没有太大意义。

请注意,尽管此处的文档谈到“进程”,但设置亲和力确实是每个线程的事情。因此,例如,在“子”线程中为“当前进程如果为零”设置亲和力,不会改变主线程或进程内其他线程的亲和力。但是,子线程从主线程继承它们的亲和性,而子进程(通过它们的主线程)从父进程的主线程继承亲和性。这会影响所有可能的启动方法(“spawn”、“fork”、“forkserver”)。下面的示例演示了这一点以及如何使用 using 修改亲和力multiprocessing.Pool

import multiprocessing as mp
import threading
import os


def _location():
    return f"{mp.current_process().name} {threading.current_thread().name}"


def thread_foo():
    print(f"{_location()}, affinity before change: {os.sched_getaffinity(0)}")
    os.sched_setaffinity(0, {4})
    print(f"{_location()}, affinity after change: {os.sched_getaffinity(0)}")


def foo(_, iterations=200e6):

    print(f"{_location()}, affinity before thread_foo:"
          f" {os.sched_getaffinity(0)}")

    for _ in range(int(iterations)):  # some dummy computation
        pass

    t = threading.Thread(target=thread_foo)
    t.start()
    t.join()

    print(f"{_location()}, affinity before exit is unchanged: "
          f"{os.sched_getaffinity(0)}")

    return _


if __name__ == '__main__':

    mp.set_start_method("spawn")  # alternatives on Unix: "fork", "forkserver"

    # for current process, exclude cores 0,1 from affinity-mask
    print(f"parent affinity before change: {os.sched_getaffinity(0)}")
    excluded_cores = {0, 1}
    os.sched_setaffinity(0, os.sched_getaffinity(0).difference(excluded_cores))
    print(f"parent affinity after change: {os.sched_getaffinity(0)}")

    with mp.Pool(2) as pool:
        pool.map(foo, range(5))

输出:

parent affinity before change: {0, 1, 2, 3, 4, 5, 6, 7}
parent affinity after change: {2, 3, 4, 5, 6, 7}
SpawnPoolWorker-1 MainThread, affinity before thread_foo: {2, 3, 4, 5, 6, 7}
SpawnPoolWorker-2 MainThread, affinity before thread_foo: {2, 3, 4, 5, 6, 7}
SpawnPoolWorker-1 Thread-1, affinity before change: {2, 3, 4, 5, 6, 7}
SpawnPoolWorker-1 Thread-1, affinity after change: {4}
SpawnPoolWorker-1 MainThread, affinity before exit is unchanged: {2, 3, 4, 5, 6, 7}
SpawnPoolWorker-1 MainThread, affinity before thread_foo: {2, 3, 4, 5, 6, 7}
SpawnPoolWorker-2 Thread-1, affinity before change: {2, 3, 4, 5, 6, 7}
SpawnPoolWorker-2 Thread-1, affinity after change: {4}
SpawnPoolWorker-2 MainThread, affinity before exit is unchanged: {2, 3, 4, 5, 6, 7}
SpawnPoolWorker-2 MainThread, affinity before thread_foo: {2, 3, 4, 5, 6, 7}
SpawnPoolWorker-2 Thread-2, affinity before change: {2, 3, 4, 5, 6, 7}
SpawnPoolWorker-2 Thread-2, affinity after change: {4}
SpawnPoolWorker-2 MainThread, affinity before exit is unchanged: {2, 3, 4, 5, 6, 7}
SpawnPoolWorker-2 MainThread, affinity before thread_foo: {2, 3, 4, 5, 6, 7}
SpawnPoolWorker-1 Thread-2, affinity before change: {2, 3, 4, 5, 6, 7}
SpawnPoolWorker-1 Thread-2, affinity after change: {4}
SpawnPoolWorker-1 MainThread, affinity before exit is unchanged: {2, 3, 4, 5, 6, 7}
SpawnPoolWorker-2 Thread-3, affinity before change: {2, 3, 4, 5, 6, 7}
SpawnPoolWorker-2 Thread-3, affinity after change: {4}
SpawnPoolWorker-2 MainThread, affinity before exit is unchanged: {2, 3, 4, 5, 6, 7}
于 2020-10-04T12:36:26.307 回答
3

如果您有一个纯 100% CPU 限制的任务,即除了计算什么都不做,那么很明显,如果进程池大小大于您计算机上可用的 CPU 数量,则不会/可能获得任何东西。但是,如果有混合的 I/O 被抛出,一个进程会放弃等待 I/O 完成的 CPU(或者,例如,从网站返回的 URL,这需要相对较长的时间) ? 对我来说,不清楚在这种情况下您是否无法在进程池大小超过os.cpu_count().

更新

这是演示这一点的代码。这段代码可能最好通过使用线程来处理,它正在使用进程。我的桌面上有 8 个内核。该程序简单地同时检索 54 个 URL(或在这种情况下并行)。程序传递了一个参数,即要使用的池的大小。不幸的是,仅仅创建额外的进程就会产生初始开销,因此如果创建太多进程,节省的成本就会开始下降。但是如果任务运行时间很长并且有很多 I/O,那么创建进程的开销最终是值得的:

from concurrent.futures import ProcessPoolExecutor, as_completed
import requests
from timing import time_it

def get_url(url):
    resp = requests.get(url, headers={'user-agent': 'my-app/0.0.1'})
    return resp.text


@time_it
def main(poolsize):
    urls = [
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
    ]
    with ProcessPoolExecutor(poolsize) as executor:
        futures = {executor.submit(get_url, url): url for url in urls}
        for future in as_completed(futures):
            text = future.result()
            url = futures[future]
            print(url, text[0:80])
            print('-' * 100)

if __name__ == '__main__':
    import sys
    main(int(sys.argv[1]))

8个进程:(我拥有的核心数):

func: main args: [(8,), {}] took: 2.316840410232544 sec.

16道工序:

func: main args: [(16,), {}] took: 1.7964842319488525 sec.

24 道工序:

func: main args: [(24,), {}] took: 2.2560818195343018 sec.
于 2020-10-04T13:18:21.970 回答
1

multiprocessing.pool 的实现使用

        if processes is None:
            processes = os.cpu_count() or 1

不确定这是否能回答您的问题,但至少它是一个数据点。

于 2020-10-03T22:10:18.650 回答