0

我正在尝试创建许多并行进程来利用 32 核机器,但是当我查看顶部屏幕时,它只显示了 5 个 Python 进程。这是我的代码:

max_processes = min(len(corpus_paths), cpu_count()*2)
__log.debug("Max processes being used: " + str(max_processes))
pool = Pool(max_processes)
for path in corpus_paths:
    pool.apply_async(...)
pool.close()
pool.join()

这是机器的配置:

[minh.lengoc@compute-1-5 ~]$ lscpu
Architecture:          x86_64
CPU op-mode(s):        32-bit, 64-bit
Byte Order:            Little Endian
CPU(s):                32
On-line CPU(s) list:   0-31
Thread(s) per core:    2
Core(s) per socket:    8
CPU socket(s):         2
NUMA node(s):          4
Vendor ID:             AuthenticAMD
CPU family:            21
Model:                 1
Stepping:              2
CPU MHz:               2099.877
BogoMIPS:              4199.44
Virtualization:        AMD-V
L1d cache:             16K
L1i cache:             64K
L2 cache:              2048K
L3 cache:              6144K
NUMA node0 CPU(s):     0,2,4,6,8,10,12,14
NUMA node1 CPU(s):     16,18,20,22,24,26,28,30
NUMA node2 CPU(s):     1,3,5,7,9,11,13,15
NUMA node3 CPU(s):     17,19,21,23,25,27,29,31

谢谢!


现在可以了。我的代码一定有问题,但我无法回滚查看它是什么。关闭。

4

2 回答 2

4

未使用所有内核的一个可能原因是 pool.apply_async 运行的目标函数完成得太快。这种情况下的解决方案是向目标函数发送更多数据(因此每次调用它会做更多的工作)。

这就像把煤铲进 32 个熔炉。如果你用小铲子,你可能只能在第一个炉子里的煤用完之前到达第五个炉子。然后你必须重新装满第一个熔炉。即使你有一大堆煤,你也永远无法使用所有的炉子。如果你使用足够大的铲子,那么你可以让所有的炉子都燃烧起来。

于 2013-03-06T15:55:30.780 回答
0

我有一个类似的问题,在我的情况下,我使用的是 gearman 并且希望每个核心都有工作人员,最初使用“Pool”,但注意到只有一个工作人员正在处理消息,所以我用下面的代码替换“Pool”以使用所有“cores - 1”,这样我就可以让工作人员同时读取队列:

if __name__ == '__main__':
jobs = []
for i in range(multiprocessing.cpu_count() - 1): 
    p = multiprocessing.Process(target=start_worker)
    jobs.append(p)
    p.start()

for j in jobs:
    j.join()
    print '%s.exitcode = %s' % (j.name, j.exitcode)

你怎么看 ?有更好的方法/想法来处理这个吗?

于 2013-03-31T02:09:13.190 回答