I have a large object of a type that cannot be shared between processes. It has methods to instantiate it and to work on its data.
The current way I'm doing it is I first instantiate the object in the main parent process and then pass it around to subprocesses when some event happens. The problem is that whenever the subprocesses run, they copy the object in memory every time which takes a while. I want to store it in memory that is only available to them so that they don't have to copy it each time they call that object's function.
How would I store an object just for that process's own use?
Edit: Code
class MultiQ:
def __init__(self):
self.pred = instantiate_predict() #here I instantiate the big object
def enq_essay(self,essay):
p = Process(target=self.compute_results, args=(essay,))
p.start()
def compute_results(self, essay):
predictions = self.pred.predict_fields(essay) #computation in the large object that doesn't modify the object
This copies the large object in memory every time. I am trying to avoid that.
Edit 4: short code sample that runs on 20 newsgroups data
import sklearn.feature_extraction.text as ftext
import sklearn.linear_model as lm
import multiprocessing as mp
import logging
import os
import numpy as np
import cPickle as pickle
def get_20newsgroups_fnames():
all_files = []
for i, (root, dirs, files) in enumerate(os.walk("/home/roman/Desktop/20_newsgroups/")):
if i>0:
all_files.extend([os.path.join(root,file) for file in files])
return all_files
documents = [unicode(open(f).read(), errors="ignore") for f in get_20newsgroups_fnames()]
logger = mp.get_logger()
formatter = logging.Formatter('%(asctime)s: [%(processName)12s] %(message)s',
datefmt = '%H:%M:%S')
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.WARNING)
mp._log_to_stderr = True
def free_memory():
"""
Return free memory available, including buffer and cached memory
"""
total = 0
with open('/proc/meminfo', 'r') as f:
for line in f:
line = line.strip()
if any(line.startswith(field) for field in ('MemFree', 'Buffers', 'Cached')):
field, amount, unit = line.split()
amount = int(amount)
if unit != 'kB':
raise ValueError(
'Unknown unit {u!r} in /proc/meminfo'.format(u=unit))
total += amount
return total
def predict(large_object, essay="this essay will be predicted"):
"""this method copies the large object in memory which is what im trying to avoid"""
vectorized_essay = large_object[0].transform(essay)
large_object[1].predict(vectorized_essay)
report_memory("done")
def train_and_model():
"""this is very similar to the instantiate_predict method from my first code sample"""
tfidf_vect = ftext.TfidfVectorizer()
X = tfidf_vect.fit_transform(documents)
y = np.random.random_integers(0,1,19997)
model = lm.LogisticRegression()
model.fit(X, y)
return (tfidf_vect, model)
def report_memory(label):
f = free_memory()
logger.warn('{l:<25}: {f}'.format(f=f, l=label))
def dump_large_object(large_object):
f = open("large_object.obj", "w")
pickle.dump(large_object, f, protocol=2)
f.close()
def load_large_object():
f = open("large_object.obj")
large_object = pickle.load(f)
f.close()
return large_object
if __name__ == '__main__':
report_memory('Initial')
tfidf_vect, model = train_and_model()
report_memory('After train_and_model')
large_object = (tfidf_vect, model)
procs = [mp.Process(target=predict, args=(large_object,))
for i in range(mp.cpu_count())]
report_memory('After Process')
for p in procs:
p.start()
report_memory('After p.start')
for p in procs:
p.join()
report_memory('After p.join')
Output 1:
19:01:39: [ MainProcess] Initial : 26585728
19:01:51: [ MainProcess] After train_and_model : 25958924
19:01:51: [ MainProcess] After Process : 25958924
19:01:51: [ MainProcess] After p.start : 25925908
19:01:51: [ Process-1] done : 25725524
19:01:51: [ Process-2] done : 25781076
19:01:51: [ Process-4] done : 25789880
19:01:51: [ Process-3] done : 25802032
19:01:51: [ MainProcess] After p.join : 25958272
roman@ubx64:$ du -h large_object.obj
4.6M large_object.obj
So maybe the large object is not even large and my problem was in the memory usage from the transform method of tfidf vectorizer.
now if I change the main method to this:
report_memory('Initial')
large_object = load_large_object()
report_memory('After loading the object')
procs = [mp.Process(target=predict, args=(large_object,))
for i in range(mp.cpu_count())]
report_memory('After Process')
for p in procs:
p.start()
report_memory('After p.start')
for p in procs:
p.join()
report_memory('After p.join')
I get these results: Output 2:
20:07:23: [ MainProcess] Initial : 26578356
20:07:23: [ MainProcess] After loading the object : 26544380
20:07:23: [ MainProcess] After Process : 26544380
20:07:23: [ MainProcess] After p.start : 26523268
20:07:24: [ Process-1] done : 26338012
20:07:24: [ Process-4] done : 26337268
20:07:24: [ Process-3] done : 26439444
20:07:24: [ Process-2] done : 26438948
20:07:24: [ MainProcess] After p.join : 26542860
Then I changed the main method to this:
report_memory('Initial')
large_object = load_large_object()
report_memory('After loading the object')
predict(large_object)
report_memory('After Process')
And got these results: Output 3:
20:13:34: [ MainProcess] Initial : 26572580
20:13:35: [ MainProcess] After loading the object : 26538356
20:13:35: [ MainProcess] done : 26513804
20:13:35: [ MainProcess] After Process : 26513804
At this point I have no idea what's going on, but the multiprocessing definitely uses more memory.