123

通过多处理生成的子进程是否共享程序中先前创建的对象?

我有以下设置:

do_some_processing(filename):
    for line in file(filename):
        if line.split(',')[0] in big_lookup_object:
            # something here

if __name__ == '__main__':
    big_lookup_object = marshal.load('file.bin')
    pool = Pool(processes=4)
    print pool.map(do_some_processing, glob.glob('*.data'))

我正在将一些大对象加载到内存中,然后创建一个需要使用该大对象的工作人员池。大对象是只读访问的,我不需要在进程之间传递它的修改。

我的问题是:大对象是否加载到共享内存中,就像我在 unix/c 中生成一个进程一样,还是每个进程都加载自己的大对象副本?

更新:进一步澄清 - big_lookup_object 是一个共享查找对象。我不需要将其拆分并单独处理。我需要保留一份。我需要拆分它的工作是读取许多其他大文件并根据查找对象查找这些大文件中的项目。

进一步更新:数据库是一个很好的解决方案,memcached 可能是一个更好的解决方案,磁盘上的文件(搁置或 dbm)可能会更好。在这个问题中,我对内存解决方案特别感兴趣。对于最终解决方案,我将使用 hadoop,但我想看看我是否也可以拥有本地内存版本。

4

8 回答 8

59

通过多处理生成的子进程是否共享程序中先前创建的对象?

Python < 3.8否,Python ≥ 3.8 是

进程有独立的内存空间。

解决方案 1

为了充分利用拥有大量工人的大型结构,请执行此操作。

  1. 将每个工作人员写成一个“过滤器”——从 读取中间结果stdin,执行工作,将中间结果写入stdout.

  2. 将所有工作人员连接为管道:

    process1 <source | process2 | process3 | ... | processn >result
    

每个进程读取、工作和写入。

这是非常有效的,因为所有进程都是同时运行的。写入和读取直接通过进程之间的共享缓冲区。


解决方案 2

在某些情况下,你有一个更复杂的结构——通常是一个扇出结构。在这种情况下,您有一个有多个孩子的父母。

  1. 父级打开源数据。父母分叉了许多孩子。

  2. 父级读取源代码,将部分源代码分配给每个同时运行的子级。

  3. 当父母到达终点时,关闭管道。孩子得到文件的结尾并正常完成。

儿童部分写起来很愉快,因为每个孩子都只是简单地阅读sys.stdin

父母在生成所有孩子和正确保留管道方面有一点花哨的步法,但这还不错。

扇入是相反的结构。许多独立运行的进程需要将它们的输入交错到一个公共进程中。收集器不那么容易编写,因为它必须从许多来源中读取。

通常使用select模块从许多命名管道中读取,以查看哪些管道有待处理的输入。


解决方案 3

共享查找是数据库的定义。

解决方案 3A – 加载数据库。让工作人员处理数据库中的数据。

解决方案 3B——使用werkzeug (或类似的)创建一个非常简单的服务器,以提供响应 HTTP GET 的 WSGI 应用程序,以便工作人员可以查询服务器。


解决方案 4

共享文件系统对象。Unix OS 提供共享内存对象。这些只是映射到内存的文件,以便交换 I/O 而不是更多的约定缓冲读取。

您可以通过多种方式从 Python 上下文中执行此操作

  1. 编写一个启动程序,(1) 将你原来的巨大对象分解成更小的对象,(2) 启动工作人员,每个工作人员都有一个更小的对象。较小的对象可以是腌制的 Python 对象,以节省一点点文件读取时间。

  2. 编写一个启动程序,该程序 (1) 读取您的原​​始巨大对象并使用操作写入一个页面结构的字节编码文件,seek以确保通过简单的查找可以轻松找到各个部分。这就是数据库引擎所做的——将数据分成页面,通过seek.

生成有权访问此大型页面结构文件的工作人员。每个工人都可以找到相关的部分并在那里工作。

于 2009-03-18T20:06:08.807 回答
39

通过多处理生成的子进程是否共享程序中先前创建的对象?

这取决于。对于全局只读变量,通常可以考虑(除了消耗的内存),否则不应该。

multiprocessing的文档说:

Better to inherit than pickle/unpickle

在 Windows 上,来自多处理的许多类型需要是可挑选的,以便子进程可以使用它们。但是,通常应该避免使用管道或队列将共享对象发送到其他进程。相反,您应该安排程序,以便需要访问在其他地方创建的共享资源的进程可以从祖先进程继承它。

Explicitly pass resources to child processes

在 Unix 上,子进程可以使用在父进程中使用全局资源创建的共享资源。但是,最好将对象作为参数传递给子进程的构造函数。

除了使代码(可能)与 Windows 兼容之外,这还确保只要子进程还活着,该对象就不会在父进程中被垃圾收集。如果在父进程中对对象进行垃圾收集时释放了某些资源,这可能很重要。

Global variables

请记住,如果在子进程中运行的代码尝试访问全局变量,那么它看到的值(如果有)可能与调用 Process.start() 时父进程中的值不同.

例子

在 Windows 上(单 CPU):

#!/usr/bin/env python
import os, sys, time
from multiprocessing import Pool

x = 23000 # replace `23` due to small integers share representation
z = []    # integers are immutable, let's try mutable object

def printx(y):
    global x
    if y == 3:
       x = -x
    z.append(y)
    print os.getpid(), x, id(x), z, id(z) 
    print y
    if len(sys.argv) == 2 and sys.argv[1] == "sleep":
       time.sleep(.1) # should make more apparant the effect

if __name__ == '__main__':
    pool = Pool(processes=4)
    pool.map(printx, (1,2,3,4))

sleep

$ python26 test_share.py sleep
2504 23000 11639492 [1] 10774408
1
2564 23000 11639492 [2] 10774408
2
2504 -23000 11639384 [1, 3] 10774408
3
4084 23000 11639492 [4] 10774408
4

没有sleep

$ python26 test_share.py
1148 23000 11639492 [1] 10774408
1
1148 23000 11639492 [1, 2] 10774408
2
1148 -23000 11639324 [1, 2, 3] 10774408
3
1148 -23000 11639324 [1, 2, 3, 4] 10774408
4
于 2009-03-18T23:17:12.623 回答
34

S.Lott是正确的。Python 的多处理快捷方式有效地为您提供了一个单独的、重复的内存块。

实际上,在大多数 *nix 系统上,使用对 will 的较低级别调用os.fork()会为您提供写时复制内存,这可能是您的想法。AFAIK,理论上,在最简单的程序中,您可以从该数据中读取而无需复制。

然而,在 Python 解释器中事情并不是那么简单。对象数据和元数据存储在同一个内存段中,因此即使对象永远不会更改,诸如该对象的引用计数器之类的递增操作也会导致内存写入,从而导致复制。几乎所有的 Python 程序所做的不仅仅是“print 'hello'”,都会导致引用计数增加,因此您可能永远不会意识到写时复制的好处。

即使有人确实设法破解了 Python 中的共享内存解决方案,尝试跨进程协调垃圾收集可能会非常痛苦。

于 2009-03-18T20:45:37.997 回答
8

如果您在 Unix 下运行,由于fork 的工作方式,它们可能共享同一个对象(即,子进程具有单独的内存,但它是写时复制的,因此只要没有人修改它就可以共享它)。我尝试了以下方法:

import multiprocessing

x = 23

def printx(y):
    print x, id(x)
    print y

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=4)
    pool.map(printx, (1,2,3,4))

并得到以下输出:

$ ./mtest.py
23 22995656
1
23 22995656
2
23 22995656
3
23 22995656
4

当然,这并不能证明没有制作副本,但是您应该能够通过查看输出来验证您的情况,ps以查看每个子进程正在使用多少实际内存。

于 2009-03-18T20:44:52.350 回答
3

不同的进程有不同的地址空间。就像运行解释器的不同实例一样。这就是 IPC(进程间通信)的用途。

为此,您可以使用队列或管道。如果您想稍后通过网络分发进程,您也可以使用 rpc over tcp。

http://docs.python.org/dev/library/multiprocessing.html#exchanging-objects-between-processes

于 2009-03-18T20:14:54.823 回答
1

与多处理本身没有直接关系,但从您的示例来看,您似乎可以只使用搁置模块或类似的东西。“big_lookup_object”真的必须完全在内存中吗?

于 2009-03-18T21:35:08.517 回答
1

不可以,但您可以将数据作为子进程加载,并允许它与其他子进程共享数据。见下文。

import time
import multiprocessing

def load_data( queue_load, n_processes )

    ... load data here into some_variable

    """
    Store multiple copies of the data into
    the data queue. There needs to be enough
    copies available for each process to access. 
    """

    for i in range(n_processes):
        queue_load.put(some_variable)


def work_with_data( queue_data, queue_load ):

    # Wait for load_data() to complete
    while queue_load.empty():
        time.sleep(1)

    some_variable = queue_load.get()

    """
    ! Tuples can also be used here
    if you have multiple data files
    you wish to keep seperate.  
    a,b = queue_load.get()
    """

    ... do some stuff, resulting in new_data

    # store it in the queue
    queue_data.put(new_data)


def start_multiprocess():

    n_processes = 5

    processes = []
    stored_data = []

    # Create two Queues
    queue_load = multiprocessing.Queue()
    queue_data = multiprocessing.Queue()

    for i in range(n_processes):

        if i == 0:
            # Your big data file will be loaded here...
            p = multiprocessing.Process(target = load_data,
            args=(queue_load, n_processes))

            processes.append(p)
            p.start()   

        # ... and then it will be used here with each process
        p = multiprocessing.Process(target = work_with_data,
        args=(queue_data, queue_load))

        processes.append(p)
        p.start()

    for i in range(n_processes)
        new_data = queue_data.get()
        stored_data.append(new_data)    

    for p in processes:
        p.join()
    print(processes)    
于 2019-03-20T02:36:02.260 回答
-3

对于 Linux/Unix/MacOS 平台,forkmap 是一个快速而简单的解决方案。

于 2015-12-07T21:37:24.883 回答