9

我有多个进程,每个进程都处理具有 40000 个元组的列表。这几乎最大化了机器上的可用内存。如果我这样做:

        while len(collection) > 0:
            row = collection.pop(0)
            row_count = row_count + 1
            new_row = []
            for value in row:
                if value is not None:
                    in_chars = str(value)
                else:
                    in_chars = ""

                #escape any naughty characters
                new_row.append("".join(["\\" + c if c in redshift_escape_chars else c for c in in_chars]))
            new_row = "\t".join(new_row)
            rows += "\n"+new_row
            if row_count % 5000 == 0:
                gc.collect()

这会释放更多内存吗?

4

2 回答 2

8

由于它以与增长collection相同的速度缩小,因此rows您的内存使用量将保持稳定。这个gc.collect()电话不会有太大的不同。

CPython 中的内存管理很微妙。仅仅因为您删除引用并运行收集周期并不一定意味着内存将返回给操作系统。有关详细信息,请参阅此答案

为了真正节省内存,您应该围绕生成器和迭代器而不是大型项目列表来构建此代码。我很惊讶你说你有连接超时,因为获取所有行不应该比一次获取一行并执行你正在做的简单处理花费更多的时间。也许我们应该看看你的 db-fetching 代码?

如果一次一行的处理确实不可能,那么至少将您的数据保留为不可变的双端队列,并使用生成器和迭代器对其执行所有处理。

我将概述这些不同的方法。

首先,一些常用的功能:

# if you don't need random-access to elements in a sequence
# a deque uses less memory and has faster appends and deletes
# from both the front and the back.
from collections import deque
from itertools import izip, repeat, islice, chain
import re

re_redshift_chars = re.compile(r'[abcdefg]')

def istrjoin(sep, seq):
    """Return a generator that acts like sep.join(seq), but lazily

    The separator will be yielded separately
    """
    return islice(chain.from_iterable(izip(repeat(sep), seq)), 1, None)

def escape_redshift(s):
    return re_redshift_chars.sub(r'\\\g<0>', s)

def tabulate(row):
    return "\t".join(escape_redshift(str(v)) if v is not None else '' for v in row)

现在理想的是一次行处理,如下所示:

cursor = db.cursor()
cursor.execute("""SELECT * FROM bigtable""")
rowstrings = (tabulate(row) for row in cursor.fetchall())
lines = istrjoin("\n", rowstrings)
file_like_obj.writelines(lines)
cursor.close()

这将占用尽可能少的内存——一次只占用一行。

如果确实需要存储整个结果集,可以稍微修改一下代码:

cursor = db.cursor()
cursor.execute("SELECT * FROM bigtable")
collection = deque(cursor.fetchall())
cursor.close()
rowstrings = (tabulate(row) for row in collection)
lines = istrjoin("\n", rowstrings)
file_like_obj.writelines(lines)

现在我们将所有结果收集到collectionfirst 中,它完全保留在内存中以供整个程序运行。

但是,我们也可以复制您在使用时删除收集项的方法。我们可以通过创建一个在工作时清空其源集合的生成器来保持相同的“代码形状” 。它看起来像这样:

def drain(coll):
    """Return an iterable that deletes items from coll as it yields them.

    coll must support `coll.pop(0)` or `del coll[0]`. A deque is recommended!
    """
    if hasattr(coll, 'pop'):
        def pop(coll):
            try:
                return coll.pop(0)
            except IndexError:
                raise StopIteration
    else:
        def pop(coll):
            try:
                item = coll[0]
            except IndexError:
                raise StopIteration
            del coll[0]
            return item
    while True:
        yield pop(coll)

现在,您可以轻松地在您想随时释放内存时进行drain(collection)替换。collection用完后drain(collection)collection对象将为空。

于 2013-04-05T05:31:22.447 回答
2

如果您的算法依赖于从左侧或列表开头弹出,您可以使用集合中的deque对象作为更快的替代方案。

作为对比:

import timeit

f1='''
q=deque()
for i in range(40000):
    q.append((i,i,'tuple {}'.format(i)))

while q:
    q.popleft()
'''

f2='''
l=[]
for i in range(40000):
    l.append((i,i,'tuple {}'.format(i)))

while l:
    l.pop(0)
'''

print 'deque took {:.2f} seconds to popleft()'.format(timeit.timeit(stmt=f1, setup='from collections import deque',number=100))
print 'list took {:.2f} seconds to pop(0)'.format(timeit.timeit(stmt=f2,number=100))

印刷:

deque took 3.46 seconds to to popleft()
list took 37.37 seconds to pop(0)

因此,对于从列表或队列开头弹出的这种特殊测试,deque速度要快 10 倍以上。

然而,这一巨大优势仅适用于左侧。如果您使用 pop() 对两者运行相同的测试,则速度大致相同。您还可以将列表反转并从右侧弹出,以获得与双端队列中的 popleft 相同的结果。


就“效率”而言,处理数据库中的单行会更有效率。如果这不是一个选项,请处理您的列表(或双端队列)“集合”。

沿着这些思路尝试一些东西。

首先,拆分行处理:

def process_row(row):
    # I did not test this obviously, but I think I xlated your row processing faithfully
    new_row = []
    for value in row:
        if value:
            in_chars = str(value)        
        else
            in_char=''
        new_row.append("".join(["\\" + c if c in redshift_escape_chars else c for c in in_chars]))  
    return '\t'.join(new_row)    

现在看看使用双端队列来允许从左侧快速弹出:

def cgen(collection):
    # if collection is a deque:
    while collection:
        yield '\n'+process_row(collection.popleft())

或者如果你想坚持一个列表:

def cgen(collection):
    collection.reverse()
    while collection:
        yield '\n'+process_row(collection.pop())

我认为你原来的 pop(0) 方法,处理行,每 5000 行调用一次 gc 可能不是最理想的。无论如何,gc 将被自动调用的频率远远高于此。

我的最终建议:

  1. 使用deque. 它就像一个list但更快的左侧推送或弹出;
  2. 使用popleft()这样您就不需要反转列表(如果顺序collection有意义);
  3. 作为生成器处理您的收藏;
  4. 抛弃调用 gc 的概念,因为它对你没有任何作用。
  5. 如果您可以只调用数据库并获得 1 行并一次处理 1 行,请在此处抛出 1-4!
于 2013-04-05T04:56:39.183 回答