3

设置

  • 使用 Django Rest Framework (DRF) 设置的 PostgreSQL 10.6 数据库,最终将接管遗留系统。包含数亿行。
  • 具有旧架构的单个 PostgreSQL 10.6 远程实例。

任务

在合理的时间(分钟)内将旧数据库中的数据导入 Django 数据库,以便可以在 CI 运行期间完成此任务。

工作至今

我们首先创建了一个单独的 Python 脚本来使用 Django API 来导入数据。团队知道这会很慢,但我们想测试 API 并最终得到一些有用的反馈。在我们的开发环境中(在 VM 中),这最终可能每秒创建几十行。

我们现在要导入更多数据。我们首先将脚本移动到 Django 管理命令中,以便我们可以直接实例化 DRF 序列化程序,而不是调用 API。这有点快。

然后我们尝试用 实例化 DRF 序列化程序many=True,但这并没有明显更快。

当前最快的实现可能每秒导入 80-100 行,所以我们决定改变策略。导入器经过全面测试,因此我们决定尝试使用外部数据包装器来使用原始 SQL 导入数据,从其中一个小表开始。生成的 Python 代码如下所示(为了便于阅读而删减):

connection = psycopg2.connect(dsn='[Django DB connection parameters]')
…
with connection.cursor() as cursor:
    cursor.execute('CREATE EXTENSION postgres_fdw')
    cursor.execute(
        "CREATE SERVER legacy FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host '{}', dbname '{}', updatable 'false')".format(…)
    )
    cursor.execute(
        "CREATE USER MAPPING FOR postgres SERVER legacy OPTIONS (user '{}', password '{}')".format(…)
    )
    cursor.execute('CREATE SCHEMA IF NOT EXISTS legacy_schema')
    cursor.execute(
        "IMPORT FOREIGN SCHEMA public LIMIT TO (small_table) FROM SERVER legacy INTO legacy_schema"
    )
…
with connection.cursor() as cursor:
    cursor.execute(
        """
        INSERT INTO django_table (…)
        SELECT …
          FROM legacy_schema.small_table
        RETURNING id, …
        """
    )

在 PostgreSQL shell 中手动运行此 SQL 工作正常

# INSERT INTO django_table …
 id | …
----+-…
  3 | …
(1 row)

INSERT 0 1

问题

Python 代码挂在最后execute。此时遗留数据库中的状态如下:

# SELECT query FROM pg_stat_activity WHERE state = 'active';
                                    query                                    
-----------------------------------------------------------------------------
 DECLARE c1 CURSOR FOR SELECT … FROM public.small_table …

在这一点SELECT pg_cancel_backend([pid])上没有做任何事情。在SELECT pg_terminate_backend([pid])Python 代码继续正常运行之后。此外,SELECT上面光标中的查询会立即返回,所以这不是一个简单的大小问题。

我还发现,上面START TRANSACTIONDECLARE语句在遗留数据库中运行得很好,所以我怀疑问题出在 Python 中。strace -f该命令显示了一个无限循环(每 20/30 秒重复一次),如下所示:

<... futex resumed> )       = -1 ETIMEDOUT (Connection timed out)
futex(0x5627e2e8e430, FUTEX_WAKE, 1) = 1
<... futex resumed> )       = 0
futex(0xc420063d48, FUTEX_WAKE, 1 <unfinished ...>
pselect6(0, NULL, NULL, NULL, {tv_sec=0, tv_nsec=20000}, NULL <unfinished ...>
<... futex resumed> )       = 1
<... futex resumed> )       = 0
pselect6(0, NULL, NULL, NULL, {tv_sec=0, tv_nsec=3000}, NULL <unfinished ...>
futex(0xc420062948, FUTEX_WAKE, 1) = 1
<... futex resumed> )       = 0
futex(0x5627e2e92d00, FUTEX_WAIT, 0, {tv_sec=29, tv_nsec=997730142} <unfinished ...>
futex(0xc420062948, FUTEX_WAIT, 0, NULL <unfinished ...>
<... pselect6 resumed> )    = 0 (Timeout)
futex(0xc420063d48, FUTEX_WAIT, 0, NULL <unfinished ...>
<... pselect6 resumed> )    = 0 (Timeout)
futex(0x5627e2e8e430, FUTEX_WAIT, 0, {tv_sec=60, tv_nsec=0}

有趣的是,如果我创建带有名称的游标,我会得到一个非常不同的错误。例如,如果我创建第一个cursor(name='cd1')光标

cursor.execute('CREATE EXTENSION postgres_fdw')
psycopg2.ProgrammingError: syntax error at or near "CREATE"
LINE 1: DECLARE "c1" CURSOR WITHOUT HOLD FOR CREATE EXTENSION postgr...

为什么在为该查询创建/使用游标时 Python 会挂起?

使用来自 PyPI 的 psycopg2-binary 2.7.6.1。


看起来问题在于,当此代码运行时,已经直接建立了与旧数据库的另一个连接,因为当我避免它正确运行时。我会尝试解决这个问题,看看会发生什么。

4

0 回答 0