0

我正在处理来自一系列文件的数据。为了达到这个目的,我构建了一个类来分发数据。我启动了 4 个进程,它们将访问同一个类并检索数据。问题是,如果我使用类方法 (retrieve()) 来检索数据,内存会不断增加。如果我不这样做,内存是稳定的,即使数据通过 getData() 不断刷新。如何在检索数据时保持稳定的内存使用?或任何其他方式来实现相同的目标?

import pandas as pd
from multiprocessing import Process, RLock
from multiprocessing.managers import BaseManager 

class myclass():
    def __init__(self, path):
        self.path = path
        self.lock = RLock()
        self.getIter()

    def getIter(self):
        self.iter = pd.read_csv(self.path, chunksize=1000)

    def getData(self):
        with self.lock:
            try:
                self.data = next(self.iter)
            except:
                self.getIter()
                self.data = next(self.iter)

    def retrieve(self):
        return self.data

def worker(c):
    while True:
        c.getData()
        # Uncommenting the following line, memory usage goes up
        data = c.retrieve()

#Generate a testing file
with open('tmp.csv', 'w') as f:
    for i in range(1000000):
        f.write('%f\n'%(i*1.))

BaseManager.register('myclass', myclass)
bm = BaseManager()
bm.start()
c = bm.myclass('tmp.csv')

for i in range(4):
    p = Process(target=worker, args=(c,))
    p.start()
4

1 回答 1

0

我无法找出原因也无法解决它,但是在将返回变量的数据类型从 pandas.DataFrame 更改为 str(json 字符串)之后,问题就出现了。

于 2019-04-16T06:25:28.253 回答