4

我编写了一个 Python 脚本,它采用 1.5 G XML 文件,解析数据并使用 copy_from 将其提供给数据库。它每 1000 个解析的节点调用以下函数。总共有大约 170k 个节点更新大约 300k 行或更多。它开始时非常快,然后随着时间的推移逐渐变慢。关于为什么会发生这种情况以及我能做些什么来解决它的任何想法?

这是我将数据提供给数据库的函数。

 def db_update(val_str, tbl, cols):

    conn = psycopg2.connect("dbname=<mydb> user=postgres password=<mypw>")
    cur = conn.cursor()
    output = cStringIO.StringIO()
    output.write(val_str)
    output.seek(0)
    cur.copy_from(output, tbl, sep='\t', columns=(cols))
    conn.commit()

我没有包含 xml 解析,因为我认为这不是问题。如果没有 db,解析器将在 2 分钟内执行。

4

3 回答 3

2

随着表的增长,有几件事会减慢插入速度:

  • 随着数据库的增长,触发器必须做更多的工作
  • 索引,随着它们的增长,更新成本会更高

禁用任何非关键触发器,或者如果不可能重新设计它们以在恒定时间内运行。

删除索引,然后在加载数据后创建它们。如果您需要实际INSERTs 或UPDATEs 的任何索引,则需要保持它们的磨损成本。

如果您正在做很多UPDATEs,请考虑VACUUM定期对 table 进行操作,或者将 autovacuum 设置为非常积极地运行。这将有助于 Pg 重用空间,而不是从文件系统中更昂贵地分配新空间,并有助于避免表膨胀。

您还可以通过不为每个工作块重新连接来节省时间。保持连接。

于 2012-08-25T03:43:03.470 回答
0

根据个人经验,copy_from 在你提交任何内容后不会更新任何索引,所以你必须稍后再做。当您完成插入所有内容时,我会将您移出conn = psycopg2.connect("dbname=<mydb> user=postgres password=<mypw>"); cur = conn.cursor()函数并执行 commit() (我建议每 ~ 100k 行提交一次,否则它会开始变慢)。

此外,它可能看起来很愚蠢,但它经常发生在我身上:确保在调用 db_update 后重置 val_str。对我来说,当 copy_from /inserts 开始变慢时,这是因为我插入了相同的行加上更多的行。

于 2012-08-30T16:52:22.693 回答
0

我使用以下内容,据我所见,我的性能没有受到任何影响:

import psycopg2
import psycopg2.extras

local_conn_string = """
    host='localhost'
    port='5432'
    dbname='backupdata'
    user='postgres'
    password='123'"""
local_conn = psycopg2.connect(local_conn_string)
local_cursor = local_conn.cursor(
    'cursor_unique_name',
     cursor_factory=psycopg2.extras.DictCursor)

我在代码中进行了以下输出来测试运行时(并且我正在解析很多行。超过 30.000.000)。

Parsed 2600000 rows in 00:25:21
Parsed 2700000 rows in 00:26:19
Parsed 2800000 rows in 00:27:16
Parsed 2900000 rows in 00:28:15
Parsed 3000000 rows in 00:29:13
Parsed 3100000 rows in 00:30:11

我不得不提到我不“复制”任何东西。但是我正在将我的行从远程 PostGreSQL 移动到本地行,并且在此过程中创建了更多的表来更好地索引我的数据,因为 30.000.000+ 在常规查询中处理起来有点太多了。

注意:时间是向上计数的,不是针对每个查询的。

我相信这与 mycursor的创建方式有关。

编辑1:

我正在使用以下内容来运行我的查询:

local_cursor.execute("""SELECT * FROM data;""")

row_count = 0
for row in local_cursor:
    if(row_count % 100000 == 0 and row_count != 0):
        print("Parsed %s rows in %s" % (row_count,
                                        my_timer.get_time_hhmmss()
                                        ))
    parse_row(row)
    row_count += 1

print("Finished running script!")
print("Parsed %s rows" % row_count)

my_timer是我制作的一个计时器类,该parse_row(row)函数格式化我的数据,将其传输到我的本地数据库,并在数据被验证为已移动到我的本地数据库后最终从远程数据库中删除。

编辑2:

即使在解析了大约 4.000.000 个查询之后,解析我的数据库中每 100.000 行大约需要 1 分钟:

Parsed 3800000 rows in 00:36:56
Parsed 3900000 rows in 00:37:54
Parsed 4000000 rows in 00:38:52
Parsed 4100000 rows in 00:39:50
于 2016-11-22T13:49:53.000 回答