42

我有一张很大的桌子。它目前在 MySQL 数据库中。我用django。

我需要遍历表的每个元素以预先计算一些特定的数据(也许如果我做得更好,我可以这样做,但这不是重点)。

我想通过不断使用内存来尽可能快地保持迭代。

正如在 *Large* Django QuerySet 中限制内存使用以及为什么迭代大型 Django QuerySet 会消耗大量内存中已经明确指出的那样?,对 django 中所有对象的简单迭代将杀死机器,因为它将从数据库中检索所有对象。

寻求解决方案

首先,为了减少你的内存消耗,你应该确保 DEBUG 是 False (或者猴子补丁游标:在保持 settings.DEBUG 的同时关闭 SQL 日志记录?)以确保 django 没有存储东西connections进行调试。

但即便如此,

for model in Model.objects.all()

是不行的。

即使是稍微改进的形式:

for model in Model.objects.all().iterator()

Usingiterator()将通过不在内部存储缓存结果来节省一些内存(尽管不一定在 PostgreSQL 上!);但显然仍会从数据库中检索整个对象。

一个天真的解决方案

一个问题的解决方案是根据计数器将结果切片 a chunk_size。有几种编写方法,但基本上它们都归结为OFFSET + LIMITSQL 中的查询。

就像是:

qs = Model.objects.all()
counter = 0
count = qs.count()
while counter < count:     
    for model in qs[counter:counter+count].iterator()
        yield model
    counter += chunk_size

虽然这是高效的内存(与 成比例的恒定内存使用chunk_size),但它在速度方面确实很差:随着 OFFSET 的增长,MySQL 和 PostgreSQL(可能还有大多数 DB)都将开始阻塞和减速。

更好的解决方案

Thierry Schellenbach在这篇文章中提供了一个更好的解决方案。它在 PK 上进行过滤,这比偏移快得多(多快可能取决于数据库)

pk = 0
last_pk = qs.order_by('-pk')[0].pk
queryset = qs.order_by('pk')
while pk < last_pk:
    for row in qs.filter(pk__gt=pk)[:chunksize]:
        pk = row.pk
        yield row
    gc.collect()

这开始变得令人满意。现在内存 = O(C),速度 ~= O(N)

“更好”解决方案的问题

只有当 PK 在 QuerySet 中可用时,更好的解决方案才有效。不幸的是,情况并非总是如此,特别是当 QuerySet 包含不同 (group_by) 和/或值 (ValueQuerySet) 的组合时。

对于这种情况,不能使用“更好的解决方案”。

我们能做得更好吗?

现在我想知道我们是否可以更快地避免关于没有 PK 的 QuerySets 的问题。也许使用我在其他答案中找到的东西,但仅限于纯 SQL:使用cursors

由于我对原始 SQL 非常不满意,尤其是在 Django 中,所以真正的问题来了:

我们如何为大表构建更好的 Django QuerySet 迭代器

我从我读过的内容中得出的结论是,我们应该使用服务器端游标(显然(请参阅参考资料)使用标准 Django 游标不会达到相同的结果,因为默认情况下 python-MySQL 和 psycopg 连接器都会缓存结果)。

这真的会是一个更快(和/或更有效)的解决方案吗?

这可以在 django 中使用原始 SQL 来完成吗?还是我们应该根据数据库连接器编写特定的 python 代码?

PostgreSQLMySQL中的服务器端游标

这就是我目前所能得到的……

一个姜戈chunked_iterator()

现在,当然最好的方法是让这种方法作为queryset.iterator(),而不是iterate(queryset),并成为 django 核心的一部分,或者至少是一个可插入的应用程序。

更新感谢评论中的“T”找到带有一些附加信息的django 票。连接器行为的差异使得最好的解决方案可能是创建一个特定的chunked方法,而不是透明地扩展iterator(听起来对我来说是个好方法)。存在一个实现存根,但一年内没有任何工作,而且看起来作者还没有准备好跳上它。

附加参考:

  1. 为什么 MYSQL 更高的 LIMIT 偏移量会减慢查询速度?
  2. 如何加快 LIMIT 子句中偏移量较大的 MySQL 查询?
  3. http://explainextended.com/2009/10/23/mysql-order-by-limit-performance-late-row-lookups/
  4. postgresql:offset + limit 变得非常慢
  5. 提高 PostgreSQL 中的 OFFSET 性能
  6. http://www.depesz.com/2011/05/20/pagination-with-fixed-order/
  7. 如何在 MySQL 中的 python服务器端游标中获取逐行 MySQL ResultSet

编辑:

Django 1.6 正在添加持久数据库连接

Django 数据库持久连接

在某些情况下,这应该有助于使用游标。仍然超出了我目前的技能(和学习时间)如何实施这样的解决方案..

此外,“更好的解决方案”绝对不适用于所有情况,不能用作通用方法,只能根据具体情况调整存根......

4

3 回答 3

3

基本答案:将原始 SQL 与服务器端游标一起使用

遗憾的是,在 Django 1.5.2 之前,没有正式的方法可以创建服务器端 MySQL 游标(不确定其他数据库引擎)。所以我写了一些神奇的代码来解决这个问题。

对于 Django 1.5.2 和 MySQLdb 1.2.4,以下代码将起作用。此外,它的评论很好。

注意:这不是基于公共 API,因此它可能会在未来的 Django 版本中中断。

# This script should be tested under a Django shell, e.g., ./manage.py shell

from types import MethodType

import MySQLdb.cursors
import MySQLdb.connections
from django.db import connection
from django.db.backends.util import CursorDebugWrapper


def close_sscursor(self):
    """An instance method which replace close() method of the old cursor.

    Closing the server-side cursor with the original close() method will be
    quite slow and memory-intensive if the large result set was not exhausted,
    because fetchall() will be called internally to get the remaining records.
    Notice that the close() method is also called when the cursor is garbage 
    collected.

    This method is more efficient on closing the cursor, but if the result set
    is not fully iterated, the next cursor created from the same connection
    won't work properly. You can avoid this by either (1) close the connection 
    before creating a new cursor, (2) iterate the result set before closing 
    the server-side cursor.
    """
    if isinstance(self, CursorDebugWrapper):
        self.cursor.cursor.connection = None
    else:
        # This is for CursorWrapper object
        self.cursor.connection = None


def get_sscursor(connection, cursorclass=MySQLdb.cursors.SSCursor):
    """Get a server-side MySQL cursor."""
    if connection.settings_dict['ENGINE'] != 'django.db.backends.mysql':
        raise NotImplementedError('Only MySQL engine is supported')
    cursor = connection.cursor()
    if isinstance(cursor, CursorDebugWrapper):
        # Get the real MySQLdb.connections.Connection object
        conn = cursor.cursor.cursor.connection
        # Replace the internal client-side cursor with a sever-side cursor
        cursor.cursor.cursor = conn.cursor(cursorclass=cursorclass)
    else:
        # This is for CursorWrapper object
        conn = cursor.cursor.connection
        cursor.cursor = conn.cursor(cursorclass=cursorclass)
    # Replace the old close() method
    cursor.close = MethodType(close_sscursor, cursor)
    return cursor


# Get the server-side cursor
cursor = get_sscursor(connection)

# Run a query with a large result set. Notice that the memory consumption is low.
cursor.execute('SELECT * FROM million_record_table')

# Fetch a single row, fetchmany() rows or iterate it via "for row in cursor:"
cursor.fetchone()

# You can interrupt the iteration at any time. This calls the new close() method,
# so no warning is shown.
cursor.close()

# Connection must be close to let new cursors work properly. see comments of
# close_sscursor().
connection.close()
于 2013-08-29T09:09:16.333 回答
3

简单的答案

如果你只需要迭代表本身而不做任何花哨的事情,Django 带有一个内置的迭代器

queryset.iterator()

这会导致 Django 清理它自己的缓存以减少内存使用。请注意,对于真正的大表,这可能还不够。


复杂的答案

如果您要对每个对象执行更复杂的操作或拥有大量数据,则必须编写自己的。以下是一个查询集迭代器,它将查询集拆分为块,并且不会比基本迭代器慢多少(它将是数据库查询的线性数量,而不是 1,但每 1,000 行只有一个查询)。该函数按主键分页,这是高效实现所必需的,因为偏移量在大多数 SQL 数据库中是线性时间操作。

def queryset_iterator(queryset, page_size=1000):
    if not queryset:
        return
    max_pk = queryset.order_by("-pk")[0].pk
    # Scale the page size up by the average density of primary keys in the queryset
    adjusted_page_size = int(page_size * max_pk / queryset.count())
    
    pages = int(max_pk / adjusted_page_size) + 1
    for page_num in range(pages):
        lower = page_num * adjusted_page_size
        page = queryset.filter(pk__gte=lower, pk__lt=lower+page_size)
        for obj in page:
            yield obj

使用看起来像:

for obj in queryset_iterator(Model.objects.all()):
    # do stuff

这段代码有三个假设:

  1. 您的主键是整数(这不适用于 UUID 主键)。
  2. 查询集的主键至少在某种程度上是均匀分布的。如果这不是真的,那么adjusted_page_size最终可能会太大,并且您可能会在迭代过程中获得一个或几个大页面。

为了了解开销,我在一个包含 40,000 个条目的 Postgres 表上进行了测试。与原始迭代相比,queryset_iterator 增加了大约 80% 的迭代时间(2.2 秒对 1.2 秒)。对于 200 到 10,000 之间的页面大小,这种开销并没有太大变化,尽管它开始上升到 200 以下。

于 2013-12-12T21:24:36.540 回答
0

还有另一种选择。它不会使迭代更快(实际上它可能会减慢它的速度),但它会使其使用更少的内存。根据您的需要,这可能是合适的。

large_qs = MyModel.objects.all().values_list("id", flat=True)
for model_id in large_qs:
    model_object = MyModel.objects.get(id=model_id)
    # do whatever you need to do with the model here

仅将 id 加载到内存中,并根据需要检索和丢弃对象。请注意增加的数据库负载和较慢的运行时间,这都是减少内存使用量的权衡。

我在工作实例上运行异步计划任务时使用了它,如果它们很慢并不重要,但是如果它们尝试使用太多内存,它们可能会使实例崩溃并因此中止进程。

于 2013-01-07T18:25:24.940 回答