64

有没有办法为python多处理池中的每个工作人员分配一个唯一的ID,以便池中特定工作人员运行的作业可以知道哪个工作人员正在运行它?根据文档, aProcess有一个name但是

该名称是一个仅用于识别目的的字符串。它没有语义。多个进程可以被赋予相同的名称。

对于我的特定用例,我想在一组四个 GPU 上运行一堆作业,并且需要为应该运行作业的 GPU 设置设备号。因为作业的长度不均匀,所以我想确保在前一个作业完成之前尝试在 GPU 上运行的作业不会在 GPU 上发生冲突(因此这排除了将 ID 预先分配给工作单元提前)。

4

5 回答 5

103

看起来你想要的很简单:multiprocessing.current_process(). 例如:

import multiprocessing

def f(x):
    print multiprocessing.current_process()
    return x * x

p = multiprocessing.Pool()
print p.map(f, range(6))

输出:

$ python foo.py 
<Process(PoolWorker-1, started daemon)>
<Process(PoolWorker-2, started daemon)>
<Process(PoolWorker-3, started daemon)>
<Process(PoolWorker-1, started daemon)>
<Process(PoolWorker-2, started daemon)>
<Process(PoolWorker-4, started daemon)>
[0, 1, 4, 9, 16, 25]

这将返回进程对象本身,因此进程可以是它自己的身份。您也可以调用id它以获得唯一的数字 id——在 cpython 中,这是进程对象的内存地址,所以我认为没有任何重叠的可能性。最后,您可以使用进程的identpid属性——但这仅在进程启动后设置。

此外,查看源代码,在我看来,自动生成的名称(如Process上面 repr 字符串中的第一个值所示)很可能是唯一的。为每个进程multiprocessing维护一个对象,该对象用于为它产生的任何子进程生成一个元组。因此顶级进程产生具有单值 id 的子进程,它们产生具有双值 id 的进程,依此类推。然后,如果没有将名称传递给构造函数,它会简单地根据 _identity自动生成名称,使用. 然后使用更改进程的名称,使自动生成的 id 保持不变。itertools.counter_identityProcess':'.join(...)Pool replace

这一切的结果是,虽然两个Processes可能具有相同的名称,因为您可能在创建它们时为它们分配相同的名称,但如果您不触摸 name 参数,它们是唯一的。此外,理论上您可以_identity用作唯一标识符;但我认为他们将这个变量设为私有是有原因的!

上面的一个例子:

import multiprocessing

def f(x):
    created = multiprocessing.Process()
    current = multiprocessing.current_process()
    print 'running:', current.name, current._identity
    print 'created:', created.name, created._identity
    return x * x

p = multiprocessing.Pool()
print p.map(f, range(6))

输出:

$ python foo.py 
running: PoolWorker-1 (1,)
created: Process-1:1 (1, 1)
running: PoolWorker-2 (2,)
created: Process-2:1 (2, 1)
running: PoolWorker-3 (3,)
created: Process-3:1 (3, 1)
running: PoolWorker-1 (1,)
created: Process-1:2 (1, 2)
running: PoolWorker-2 (2,)
created: Process-2:2 (2, 2)
running: PoolWorker-4 (4,)
created: Process-4:1 (4, 1)
[0, 1, 4, 9, 16, 25]
于 2012-04-17T13:53:11.433 回答
7

您可以使用multiprocessing.Queue来存储 id,然后在池进程初始化时获取 id。

优点:

  • 您不需要依赖内部结构。
  • 如果您的用例是管理资源/设备,那么您可以直接输入设备编号。这也将确保没有设备被使用两次:如果池中的进程多于设备,则额外的进程将阻塞queue.get()并且不会执行任何工作(这不会阻塞您的 porgram,或者至少在我测试过)。

缺点:

  • 您有额外的通信开销,并且生成池进程需要更长的时间:没有sleep(1)示例中的所有工作可能由第一个进程执行,因为其他进程尚未完成初始化。
  • 你需要一个全球性的(或者至少我不知道如何解决它)

例子:

import multiprocessing
from time import sleep

def init(queue):
    global idx
    idx = queue.get()

def f(x):
    global idx
    process = multiprocessing.current_process()
    sleep(1)
    return (idx, process.pid, x * x)

ids = [0, 1, 2, 3]
manager = multiprocessing.Manager()
idQueue = manager.Queue()

for i in ids:
    idQueue.put(i)

p = multiprocessing.Pool(8, init, (idQueue,))
print(p.map(f, range(8)))

输出:

[(0, 8289, 0), (1, 8290, 1), (2, 8294, 4), (3, 8291, 9), (0, 8289, 16), (1, 8290, 25), (2, 8294, 36), (3, 8291, 49)]

请注意,只有 4 个不同的 pid,尽管池包含 8 个进程并且一个 idx 仅由一个进程使用。

于 2017-03-15T18:30:39.020 回答
1

我用线程做到了这一点,最终使用队列来处理作业管理。这是基线。我的完整版本有一堆try-catches(特别是在工作人员中,以确保q.task_done()即使在失败时也会调用)。

from threading import Thread
from queue import Queue
import time
import random


def run(idx, *args):
    time.sleep(random.random() * 1)
    print idx, ':', args


def run_jobs(jobs, workers=1):
    q = Queue()
    def worker(idx):
        while True:
            args = q.get()
            run(idx, *args)
            q.task_done()

    for job in jobs:
        q.put(job)

    for i in range(0, workers):
        t = Thread(target=worker, args=[i])
        t.daemon = True
        t.start()

    q.join()


if __name__ == "__main__":
    run_jobs([('job', i) for i in range(0,10)], workers=5)

我不需要使用多处理(我的工作人员只是用于调用外部进程),但这可以扩展。用于多处理的 API 稍微改变了一点,以下是您可以适应的方式:

from multiprocessing import Process, Queue
from Queue import Empty
import time
import random

def run(idx, *args):
    time.sleep(random.random() * i)
    print idx, ':', args


def run_jobs(jobs, workers=1):
    q = Queue()
    def worker(idx):
        try:
            while True:
                args = q.get(timeout=1)
                run(idx, *args)
        except Empty:
            return

    for job in jobs:
        q.put(job)

    processes = []
    for i in range(0, workers):
        p = Process(target=worker, args=[i])
        p.daemon = True
        p.start()
        processes.append(p)

    for p in processes: 
        p.join()


if __name__ == "__main__":
    run_jobs([('job', i) for i in range(0,10)], workers=5)

两个版本都会输出如下内容:

0 : ('job', 0)
1 : ('job', 2)
1 : ('job', 6)
3 : ('job', 3)
0 : ('job', 5)
1 : ('job', 7)
2 : ('job', 1)
4 : ('job', 4)
3 : ('job', 8)
0 : ('job', 9)
于 2017-06-05T15:10:25.587 回答
0

我不确定它会如何工作Pool,但打印Process会提供一些独特的输出:

x = Process(target=time.sleep, args=[20])
x.start()
print(x)  # <Process name='Process-5' pid=97121 parent=95732 started>
于 2020-10-05T14:10:08.547 回答
0

我设法通过使用 获取函数句柄来映射到类方法getattr,然后使用包装器来打包和解包尽可能多的参数,因为我想传递给被映射的方法。在我的例子中,我从启动池的同一个类中传递方法,但您也可以传递一个对象以映射到不同的类。

这是代码:

import multiprocessing
from multiprocessing import Pool


def warp(args):
    func = args[0]
    frame = args[1]
    left_over = args[2:]
    func(frame, *left_over)


class MyClass:

    def __init__(self):
        self.my_flag = 5

    def exec_method(self, method, int_list, *args):
        obj = getattr(self, method.__name__)

        packed = list()
        for i in int_list:
            pack = list()
            pack.append(obj)
            pack.append(i)
            for arg in args:
                pack.append(arg)
            packed.append(pack)

        print("Start")
        pool = Pool(processes=multiprocessing.cpu_count())
        pool.map(warp, packed)
        print("End")

    def method1(self, my_str):
        print(self.my_flag, my_str)

    def method2(self, i, print_str, bool_flat):
        print(multiprocessing.current_process(), self.my_flag, i, print_str, str(bool_flat))


cls: MyClass = MyClass()
cls.my_flag = 58
cls.exec_method(cls.method2, [1, 5, 10, 20, 30], "this is a string", True)

这是输出:

Start
<ForkProcess(ForkPoolWorker-1, started daemon)> 58 1 this is a string True
<ForkProcess(ForkPoolWorker-2, started daemon)> 58 5 this is a string True
<ForkProcess(ForkPoolWorker-4, started daemon)> 58 20 this is a string True
<ForkProcess(ForkPoolWorker-5, started daemon)> 58 30 this is a string True
<ForkProcess(ForkPoolWorker-3, started daemon)> 58 10 this is a string True
End
于 2020-11-25T08:08:25.803 回答