3

I have a large object of a type that cannot be shared between processes. It has methods to instantiate it and to work on its data.

The current way I'm doing it is I first instantiate the object in the main parent process and then pass it around to subprocesses when some event happens. The problem is that whenever the subprocesses run, they copy the object in memory every time which takes a while. I want to store it in memory that is only available to them so that they don't have to copy it each time they call that object's function.

How would I store an object just for that process's own use?

Edit: Code

class MultiQ:
    def __init__(self):
        self.pred = instantiate_predict() #here I instantiate the big object

    def enq_essay(self,essay):
        p = Process(target=self.compute_results, args=(essay,))
        p.start()

    def compute_results(self, essay):
        predictions = self.pred.predict_fields(essay) #computation in the large object that doesn't modify the object

This copies the large object in memory every time. I am trying to avoid that.

Edit 4: short code sample that runs on 20 newsgroups data

import sklearn.feature_extraction.text as ftext
import sklearn.linear_model as lm
import multiprocessing as mp
import logging
import os
import numpy as np
import cPickle as pickle


def get_20newsgroups_fnames():
    all_files = []
    for i, (root, dirs, files) in enumerate(os.walk("/home/roman/Desktop/20_newsgroups/")):
        if i>0:
            all_files.extend([os.path.join(root,file) for file in files])
    return all_files

documents = [unicode(open(f).read(), errors="ignore") for f in get_20newsgroups_fnames()]
logger = mp.get_logger()
formatter = logging.Formatter('%(asctime)s: [%(processName)12s] %(message)s',
                              datefmt = '%H:%M:%S')
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.WARNING)
mp._log_to_stderr = True


def free_memory():
    """
    Return free memory available, including buffer and cached memory
    """
    total = 0
    with open('/proc/meminfo', 'r') as f:
        for line in f:
            line = line.strip()
            if any(line.startswith(field) for field in ('MemFree', 'Buffers', 'Cached')):
                field, amount, unit = line.split()
                amount = int(amount)
                if unit != 'kB':
                    raise ValueError(
                        'Unknown unit {u!r} in /proc/meminfo'.format(u=unit))
                total += amount
    return total


def predict(large_object, essay="this essay will be predicted"):
    """this method copies the large object in memory which is what im trying to avoid"""
    vectorized_essay = large_object[0].transform(essay)
    large_object[1].predict(vectorized_essay)
    report_memory("done")


def train_and_model():
    """this is very similar to the instantiate_predict method from my first code sample"""
    tfidf_vect = ftext.TfidfVectorizer()
    X = tfidf_vect.fit_transform(documents)
    y = np.random.random_integers(0,1,19997)
    model = lm.LogisticRegression()
    model.fit(X, y)
    return (tfidf_vect, model)


def report_memory(label):
    f = free_memory()
    logger.warn('{l:<25}: {f}'.format(f=f, l=label))

def dump_large_object(large_object):
    f = open("large_object.obj", "w")
    pickle.dump(large_object, f, protocol=2)
    f.close()

def load_large_object():
    f = open("large_object.obj")
    large_object = pickle.load(f)
    f.close()
    return large_object

if __name__ == '__main__':
    report_memory('Initial')
    tfidf_vect, model = train_and_model()
    report_memory('After train_and_model')
    large_object = (tfidf_vect, model)
    procs = [mp.Process(target=predict, args=(large_object,))
             for i in range(mp.cpu_count())]
    report_memory('After Process')
    for p in procs:
        p.start()
    report_memory('After p.start')
    for p in procs:
        p.join()
    report_memory('After p.join')

Output 1:

19:01:39: [ MainProcess] Initial                  : 26585728
19:01:51: [ MainProcess] After train_and_model    : 25958924
19:01:51: [ MainProcess] After Process            : 25958924
19:01:51: [ MainProcess] After p.start            : 25925908
19:01:51: [   Process-1] done                     : 25725524
19:01:51: [   Process-2] done                     : 25781076
19:01:51: [   Process-4] done                     : 25789880
19:01:51: [   Process-3] done                     : 25802032
19:01:51: [ MainProcess] After p.join             : 25958272
roman@ubx64:$ du -h large_object.obj
4.6M    large_object.obj

So maybe the large object is not even large and my problem was in the memory usage from the transform method of tfidf vectorizer.

now if I change the main method to this:

report_memory('Initial')
large_object = load_large_object()
report_memory('After loading the object')
procs = [mp.Process(target=predict, args=(large_object,))
         for i in range(mp.cpu_count())]
report_memory('After Process')
for p in procs:
    p.start()
report_memory('After p.start')
for p in procs:
    p.join()
report_memory('After p.join')

I get these results: Output 2:

20:07:23: [ MainProcess] Initial                  : 26578356
20:07:23: [ MainProcess] After loading the object : 26544380
20:07:23: [ MainProcess] After Process            : 26544380
20:07:23: [ MainProcess] After p.start            : 26523268
20:07:24: [   Process-1] done                     : 26338012
20:07:24: [   Process-4] done                     : 26337268
20:07:24: [   Process-3] done                     : 26439444
20:07:24: [   Process-2] done                     : 26438948
20:07:24: [ MainProcess] After p.join             : 26542860

Then I changed the main method to this:

report_memory('Initial')
large_object = load_large_object()
report_memory('After loading the object')
predict(large_object)
report_memory('After Process')

And got these results: Output 3:

20:13:34: [ MainProcess] Initial                  : 26572580
20:13:35: [ MainProcess] After loading the object : 26538356
20:13:35: [ MainProcess] done                     : 26513804
20:13:35: [ MainProcess] After Process            : 26513804

At this point I have no idea what's going on, but the multiprocessing definitely uses more memory.

4

2 回答 2

2

Linux 使用copy-on-write,这意味着当一个子进程被分叉时,每个子进程中的全局变量共享相同的内存地址,直到值被修改。只有当一个值被修改时,它才会被复制。

所以理论上,如果大对象不被修改,它可以被子进程使用,而不会消耗更多的内存。让我们测试一下这个理论。

这是您的代码,经过了一些内存使用记录:

import sklearn.feature_extraction.text as ftext
import sklearn.linear_model as lm
import multiprocessing as mp
import logging

logger = mp.get_logger()
formatter = logging.Formatter('%(asctime)s: [%(processName)12s] %(message)s',
                              datefmt='%H:%M:%S')
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.WARNING)
mp._log_to_stderr = True


def predict(essay="this essay will be predicted"):
    """this method copies the large object in memory which is what im trying to avoid"""
    vectorized_essay = large_object[0].transform(essay)
    large_object[1].predict(vectorized_essay)
    report_memory("done")


def train_and_model():
    """this is very similar to the instantiate_predict method from my first code sample"""
    tfidf_vect = ftext.TfidfVectorizer()
    N = 100000
    corpus = [
        'This is the first document.',
        'This is the second second document.',
        'And the third one.',
        'Is this the first document?', ] * N
    y = [1, 0, 1, 0] * N
    report_memory('Before fit_transform')
    X = tfidf_vect.fit_transform(corpus)
    model = lm.LogisticRegression()
    model.fit(X, y)
    report_memory('After model.fit')
    return (tfidf_vect, model)


def free_memory():
    """
    Return free memory available, including buffer and cached memory
    """
    total = 0
    with open('/proc/meminfo', 'r') as f:
        for line in f:
            line = line.strip()
            if any(line.startswith(field) for field in ('MemFree', 'Buffers', 'Cached')):
                field, amount, unit = line.split()
                amount = int(amount)
                if unit != 'kB':
                    raise ValueError(
                        'Unknown unit {u!r} in /proc/meminfo'.format(u=unit))
                total += amount
    return total


def gen_change_in_memory():
    f = free_memory()
    diff = 0
    while True:
        yield diff
        f2 = free_memory()
        diff = f - f2
        f = f2
change_in_memory = gen_change_in_memory().next

def report_memory(label):
    logger.warn('{l:<25}: {d:+d}'.format(d=change_in_memory(), l=label))

if __name__ == '__main__':
    report_memory('Initial')
    tfidf_vect, model = train_and_model()
    report_memory('After train_and_model')
    large_object = (tfidf_vect, model)
    procs = [mp.Process(target=predict) for i in range(mp.cpu_count())]
    report_memory('After Process')
    for p in procs:
        p.start()
    for p in procs:
        p.join()
    report_memory('After p.join')

它产生:

21:45:01: [ MainProcess] Initial                  : +0
21:45:01: [ MainProcess] Before fit_transform     : +3224
21:45:12: [ MainProcess] After model.fit          : +153572
21:45:12: [ MainProcess] After train_and_model    : -3100
21:45:12: [ MainProcess] After Process            : +0
21:45:12: [   Process-1] done                     : +2232
21:45:12: [   Process-2] done                     : +2976
21:45:12: [   Process-3] done                     : +3596
21:45:12: [   Process-4] done                     : +3224
21:45:12: [ MainProcess] After p.join             : -372

报告的数字是可用内存(包括缓存和缓冲区)的 KiB 变化。因此,例如,“Initial”和“After train_and_model”之间的可用内存变化约为 150MB。因此,large_object需要大约 150MB。

然后,在完成 4 个子进程后,消耗的内存量要少得多——总共大约 12MB。消耗的内存可能是由于创建子进程加上transformand predict方法使用的内存。

所以看起来large_object没有被复制,因为如果我们应该看到消耗的内存增加了大约 150MB。


关于您在 20 个新闻组上的评论

以下是可用内存的变化:

关于 20 个新闻组的数据:

| Initial               |       0 |
| After train_and_model |  626804 | <-- Large object requires 627M
| After Process         |       0 |
| After p.start         |   33016 |
| done                  |  200384 | 
| done                  |  -55552 |
| done                  |   -8804 |
| done                  |  -12152 |
| After p.join          | -156240 |

所以看起来实例化大对象需要 627MB。我不知道为什么在done达到第一个之后又消耗了 200+MB。

使用 load_large_object:

| Initial                  |       0 |
| After loading the object |   33976 |
| After Process            |       0 |
| After p.start            |   21112 |
| done                     |  185256 |
| done                     |     744 |
| done                     | -102176 |
| done                     |     496 |
| After p.join             | -103912 |

显然,large_object 本身只需要 34MB,其余的内存,627-34 = 593MB 肯定已经被调用的fit_transformandfit方法消耗掉了train_and_model

使用单一进程:

| Initial                  |     0 |
| After loading the object | 34224 |
| done                     | 24552 |
| After Process            |     0 |

这是有道理的。

因此,您积累的数据似乎支持大对象本身没有被每个子进程复制的说法。但是出现了一个新的谜团:为什么在“After p.start”和第一个“done”之间会有大量的内存消耗。我不知道答案。


您可以尝试report_memory拨打电话

vectorized_essay = large_object[0].transform(essay)

large_object[1].predict(vectorized_essay)

查看消耗额外内存的位置。我的猜测是这些 scikit-learn 方法之一是选择分配这个(相对)大量的内存。

于 2013-01-21T19:42:17.517 回答
0

我最终使用了使用 Rabbit MQ 的 RPC 服务器。Rabbit MQ RPC/Python 教程。所以我创建的服务器数量与我机器上的 CPU 数量相等。这些服务器启动一次,并为模型和矢量化器分配一次内存,并在运行时保留它。其他优点是

  1. 如果一台机器不堪重负,一些处理可以很容易地发送到另一台机器
  2. 如果在一台服务器上计算失败,则可以轻松地将其发送到另一台服务器
  3. 原始代码中的内存分配过程不是即时的,因此我的数据集的总体运行时间从每次查询的 18 秒下降到 12 秒,因为内存是预先分配的。

总的来说,我的代码也变得更加简洁。

于 2013-02-15T02:01:12.470 回答