6

我目前正在分析一个维基百科转储文件;我正在使用 python 从中提取一堆数据并将其保存到 PostgreSQL 数据库中。我一直在努力让事情变得更快,因为这个文件很大(18GB)。为了与 PostgreSQL 交互,我使用 psycopg2,但这个模块似乎模仿了许多其他这样的 DBAPI。

无论如何,我有一个关于 cursor.executemany(command, values); 的问题。在我看来,每 1000 个值左右执行一次 executemany 比为这 500 万个值中的每一个调用 cursor.execute(command % value) 更好(请确认或纠正我!)。

但是,你看,我正在使用 executemany 将 1000 行插入到具有唯一完整性约束的表中;这个约束事先没有在 python 中验证,因为这要么需要我一直选择(这似乎适得其反),要么需要我获得超过 3 GB 的 RAM。所有这一切都说明当我的脚本试图通过捕获 psycopg2.DatabaseError 来插入已经存在的行时,我依靠 Postgres 来警告我。

当我的脚本检测到这样的非唯一插入时,它 connection.rollback() (每次最多可生成 1000 行,并且有点使 executemany 毫无价值),然后一一插入所有值。

由于 psycopg2 的文档记录很差(许多很棒的模块也是如此......),我找不到有效的解决方法。我已将每个 executemany 插入的值的数量从 1000 减少到 100,以减少每个 executemany 的非唯一插入的可能性,但我很确定它们是一种告诉 psycopg2 忽略这些异常或告诉光标继续执行。

基本上,这似乎是一种解决方案如此简单和流行的问题,我所能做的就是询问以了解它。

再次感谢!

4

4 回答 4

8

只需使用 psql \copy 命令将所有数据复制到临时表中,或使用 psycopg cursor.copy_in() 方法。然后:

insert into mytable
select * from (
    select distinct * 
    from scratch
) uniq
where not exists (
    select 1 
    from mytable 
    where mytable.mykey = uniq.mykey
);

这将比任何插入组合更快地进行重复数据删除和运行。

-dg

于 2009-02-15T13:13:40.237 回答
5

我遇到了同样的问题,在这里搜索了很多天,收集了很多提示,形成了一个完整的解决方案。即使问题已过时,我希望这对其他人有用。

1)忘记删除索引/约束并在以后重新创建它们的事情,好处是微不足道的或更糟。

2) executemany 比 execute 更好,因为它为您提供了准备语句。您可以使用以下命令自己获得相同的结果,以获得 300% 的速度:

# To run only once:
sqlCmd = """PREPARE myInsert (int, timestamp, real, text) AS
   INSERT INTO myBigTable (idNumber, date_obs, result, user)
     SELECT $1, $2, $3, $4 WHERE NOT EXISTS
     (SELECT 1 FROM myBigTable WHERE (idNumber, date_obs, user)=($1, $2, $4));"""
curPG.execute(sqlCmd)
cptInsert = 0   # To let you commit from time to time

#... inside the big loop:
curPG.execute("EXECUTE myInsert(%s,%s,%s,%s);", myNewRecord)
allreadyExists = (curPG.rowcount < 1)
if not allreadyExists:
   cptInsert += 1
   if cptInsert % 10000 == 0:
      conPG.commit()

此虚拟表示例对 (idNumber, date_obs, user) 具有唯一约束。

3) 最好的解决方案是使用 COPY_FROM 和 TRIGGER 来管理插入前的唯一键。这让我的速度提高了 36 倍。我开始以 500 条记录/秒的速度正常插入。通过“复制”,我每秒获得了超过 18,000 条记录。带有 Psycopg2 的 Python 示例代码:

ioResult = StringIO.StringIO() #To use a virtual file as a buffer
cptInsert = 0 # To let you commit from time to time - Memory has limitations
#... inside the big loop:
   print >> ioResult, "\t".join(map(str, myNewRecord))
   cptInsert += 1
   if cptInsert % 10000 == 0:
      ioResult = flushCopyBuffer(ioResult, curPG)
#... after the loop:
ioResult = flushCopyBuffer(ioResult, curPG)

def flushCopyBuffer(bufferFile, cursorObj):
   bufferFile.seek(0)   # Little detail where lures the deamon...
   cursorObj.copy_from(bufferFile, 'myBigTable',
      columns=('idNumber', 'date_obs', 'value', 'user'))
   cursorObj.connection.commit()
   bufferFile.close()
   bufferFile = StringIO.StringIO()
   return bufferFile

这就是 Python 部分的内容。现在 Postgresql 触发器没有异常 psycopg2.IntegrityError 然后所有 COPY 命令的记录被拒绝:

CREATE OR REPLACE FUNCTION chk_exists()
  RETURNS trigger AS $BODY$
DECLARE
    curRec RECORD;
BEGIN
   -- Check if record's key already exists or is empty (file's last line is)
   IF NEW.idNumber IS NULL THEN
      RETURN NULL;
   END IF;
   SELECT INTO curRec * FROM myBigTable
      WHERE (idNumber, date_obs, user) = (NEW.idNumber, NEW.date_obs, NEW.user);
   IF NOT FOUND THEN -- OK keep it
      RETURN NEW;
   ELSE    
      RETURN NULL; -- Oups throw it or update the current record
   END IF;
END;
$BODY$ LANGUAGE plpgsql;

现在将此函数链接到表的触发器:

CREATE TRIGGER chk_exists_before_insert
   BEFORE INSERT ON myBigTable FOR EACH ROW EXECUTE PROCEDURE chk_exists();

这似乎需要做很多工作,但是当 Postgresql 不必一遍又一遍地解释 SQL 时,它是一个非常快速的野兽。玩得开心。

于 2012-06-15T23:24:35.633 回答
0

“当我的脚本检测到这样一个非唯一的插入时,它 connection.rollback() (每次最多可包含 1000 行,并且有点使 executemany 毫无价值),然后一个一个地插入所有值。”

这个问题真的没有多大意义。

1,000 行的每个块是否由于非唯一行而失败?

1,000 行中的 1 个块是否失败(5,000 个这样的块)?如果是这样,那么执行许多有助于 5,000 中的 4,999 并且远非“毫无价值”。

您是否担心这种非独特的插入?或者你有关于这种情况发生次数的实际统计数据吗?

如果您已经从 1,000 行块切换到 100 行块,那么您显然可以确定 1,000 行块、100 行块和 1 行块是否具有性能优势。

请使用实际数据库和不同大小的块实际运行实际程序并发布数字。

于 2008-12-28T23:06:15.400 回答
-1

使用 MERGE 语句而不是 INSERT 语句可以解决您的问题。

于 2009-03-24T01:32:07.117 回答