0

我有一个关于序列的补救问题。我已经使用了一些它们并查阅了文档,并希望这对小组来说是一个简单的问题。我们现在使用的是 Postgres 11.4,只要它在 RDS 上可用,我们就会迁移到 PG 12。

目标是让一组数字在每次插入或更新一行时增加。我们为这种计数器使用字段名称“con_id”(并发 ID)。所以第一次在空表中插入一行时,值为 1,第二行得到 2,依此类推。听起来像SEQUENCE. 我在这个角色中有一个标准序列,然后切换到AS IDENTITY......但现在意识到这可能是一个错误。

更新时,计数器应继续工作。因此,如果第一行被更新,con_id 从 1 变为 3 current max()+1,. ON CONFLICT(id) SET作为记录,我们所有的更新都使用UPDATE.

数字系列的要点是为各种操作定义起止界限:

operation     last_number_processed
sync_to_domo  124556
rollup_day    123516
rollup_week   103456

然后,当需要执行这些操作之一时,您只需从 last_number_processed+1 到 max(con_id) 中选择 con_id 即可找到正确的记录块。操作完成后,您必须使用该 max(con_id) 更新操作跟踪器。

select max(con_id) from source_data; -- Get the current highest number.

然后,“sync_to_domo”的范围类似于 124557-128923。

这里不需要唯一性,尽管它是可取的。差距根本不重要。保持数字顺序是必不可少的。

如果我搞砸了,这种更新操作很容易成为可怕的瓶颈。谁能建议最佳可靠、低争用策略来维护一个计数器,该计数器在每次插入或更新时从表 +1 中获取最大值?

而且,是的,时间戳可以用于此目的,它只是另一种数字线。整数的原因是为了匹配我们编码其他系统的方式。当跨平台的数据类型保持相同时,更容易解释和推理这些东西。或者看起来,在这种情况下。

测试代码

我正在向我的原始问题添加一些测试代码和结果。这行得通,但我怀疑它是否超级有效。下面的测试是一个最小版本,并没有用......我只是想确定我是否可以获得越来越多的插入和修订。快速检查一下,这项工作看起来还不错,但是我没有内化 Postgres 的争用方法,所以我主要包括它,以便人们可以告诉我为什么它很糟糕。所以,请做;-)

设置是有一个自动分配的序列 onINSERT和通过 per ROWtrigger on UPDATE。听起来不太理想,有没有更好的方法?它在单连接测试中工作,但在UPDATE. 这对我来说不是问题,但我不明白为什么会这样。

这是独立的测试代码:

DROP TABLE IF EXISTS data.test_con_ids;
DROP SEQUENCE IF EXISTS test_con_ids_sequence;
DROP FUNCTION IF EXISTS data.test_con_ids_update;

BEGIN;
CREATE SEQUENCE data.test_con_ids_sequence 
   AS bigint;

CREATE TABLE data.test_con_ids (
    id integer NOT NULL DEFAULT NULL PRIMARY KEY,
    con_id bigint NOT NULL DEFAULT NEXTVAL ('test_con_ids_sequence'),
    dts timestamptz default NOW()
);


CREATE OR REPLACE FUNCTION data.test_con_ids_update()
 RETURNS trigger
 LANGUAGE plpgsql
AS $function$
BEGIN
    NEW.con_id := NEXTVAL ('test_con_ids_sequence');
 RETURN NEW;
END
$function$;

    -- It's late here, not sure if I could use a FOR EACH STATEMENT trigger here, which should be faster. If it would work.
    CREATE TRIGGER test_con_ids_update_trigger  BEFORE UPDATE ON data.test_con_ids
        FOR EACH ROW EXECUTE PROCEDURE test_con_ids_update();

-- Add ten records, IDs 1-10, con_ids 1-10.     
    INSERT INTO data.test_con_ids (id) 
          VALUES (1),(2),(3),(4),(5),(6),(7),(8),(9),(10);

-- Update rows 1-5 to their current values. The trigger should increment the con_id.   
    INSERT INTO data.test_con_ids (id) 
          VALUES (1),(2),(3),(4),(5)
          ON CONFLICT(id) DO UPDATE set id = EXCLUDED.id; -- Completely pointless, obviously...just a test. We always UPSERT with ON CONFLICT DO UPDATE.

    COMMIT;

结果如下:

id  con_id  dts
1   12  2019-11-02 21:52:34.333926+11
2   14  2019-11-02 21:52:34.333926+11
3   16  2019-11-02 21:52:34.333926+11
4   18  2019-11-02 21:52:34.333926+11
5   20  2019-11-02 21:52:34.333926+11
6   6   2019-11-02 21:52:34.333926+11
7   7   2019-11-02 21:52:34.333926+11
8   8   2019-11-02 21:52:34.333926+11
9   9   2019-11-02 21:52:34.333926+11
10  10  2019-11-02 21:52:34.333926+11 

这行得通,创建 1-10,更新 1-5 并使其 con_id 计数器值递增。2 出于某种原因(?),但至少它们的顺序是有用的,这是我们需要的。

任何人都可以就如何更有效地获得这种行为提供建议吗?目标是为反映最后一次INSERTUPDATE活动的记录建立一个不断增加的数字线。而且,因为我们在其他任何地方都使用整数,所以我们试图坚持使用整数而不是时间戳。但是,老实说,这在很多方面都是装饰性的。我正在研究的另一个原因SEQUENCE是,除非我误解了,否则它不会在交易中绑定。这非常适合……我们不需要一个无间隙的数字系列,只需要一个连续的数字系列。

Postgres 12 测试

按照 Belayer 的建议,我创建了一个 PG 12 数据库作为实验。我使用默认值,所以一切都在public. (在现实世界中,我去掉了public。)是的,生成的列似乎可以工作,只要你有一个不可变的函数。我在 Postgres 中读过IMMUTABLE好几次……但我不明白。所以,我不能说这个功能是安全的。似乎应该如此。我遵循了这篇有价值的文章中使用的模式:

https://www.2ndquadrant.com/en/blog/generated-columns-in-postgresql-12/

CREATE OR REPLACE FUNCTION public.generate_concurrency_id() RETURNS bigint
AS $$
    SELECT EXTRACT(EPOCH FROM clock_timestamp())::bigint; 
$$
LANGUAGE sql IMMUTABLE;

COMMENT ON FUNCTION public.generate_concurrency_id() IS 'Generate a bigint to act as a progressive change counter for a table.';

DROP TABLE IF EXISTS public.test_con_ids;
CREATE TABLE test_con_ids (
    id integer NOT NULL DEFAULT NULL PRIMARY KEY,
    con_id bigint GENERATED ALWAYS AS (generate_concurrency_id()) STORED,
    dts timestamptz default NOW()
);

-- Add ten records, IDs 1-10, con_ids 1-10.     
    INSERT INTO public.test_con_ids (id) 
          VALUES (1),(2),(3),(4),(5),(6),(7),(8),(9),(10);

-- Unless you wait here, the con_ids all look the same...this operation is way too short to tick over to another second, unless you get lucky.
-- Update rows 1-5 to their current values. The trigger should increment the con_id.   
    INSERT INTO public.test_con_ids (id) 
          VALUES (1),(2),(3),(4),(5)
          ON CONFLICT(id) DO UPDATE set id = EXCLUDED.id; -- Completely pointless, obviously...just a test. We always UPSERT with ON CONFLICT DO UPDATE.

上面的示例确实适用于生成的列,但我不知道 PG 12 计算列与触发器的性能特征。

更可悲的是,我认为这对我来说可能根本行不通。我真的需要一个提交时间戳,它只能通过逻辑解码获得。这就是为什么,通过一个例子。

  • 我正在汇总数据并希望获取未处理的更新或插入的行。
  • 我得到的最后一个数字来自于上午 01:00:00 添加的记录。现在我想得到所有更高的数字。
  • 好吧...我这样做。
  • 哦,等等,一个不完整的交易进来并提交了之前的时间戳/派生数字。

ASEQUENCE在这里也不起作用,因为在事务提交之前,我的收集器进程再一次看不到行/元组。

所以我认为这是逻辑解码,更新汇总表,或者破产。

4

2 回答 2

1

这是序列的一个明确用例。

CREATE FUNCTION con_id_update() RETURNS trigger
LANGUAGE plpgsql AS $$
  BEGIN
    new.con_id := nextval(pg_get_serial_sequence(TG_TABLE_SCHEMA || '.' || TG_TABLE_NAME, 'con_id'));
    RETURN new;
  END;
$$;

CREATE TABLE example (
  id bigserial PRIMARY KEY,
  con_id serial NOT NULL
);

CREATE TRIGGER con_id BEFORE UPDATE ON example
EXECUTE FUNCTION con_id_update();

或类似的东西。

于 2021-08-09T06:05:52.707 回答
0

悲伤和可怕的真相

我意识到我对交易一无所知,而且这些方案都行不通。这不仅不能用于UPDATE,它也不能用于INSERT任何一个,这有点令人惊讶。如果我这里有什么问题,请告诉我在哪里。

作为一个思想实验,想象一个表上的三个并发的、未提交的事务,其中 concurrency_id 编号在INSERTor上递增UPDATE。这是一张简单的图片。

在此处输入图像描述

现在,max(concurrency_id)表中的内容是什么?这是没有意义的,因为尚未提交任何事务。如果您使用的是 a sequence,您可以抓住 thenextval然后知道数字较小的任何东西都是“较早的”。好吧,想象一下我max在生产中使用,我会得到什么?取决于我什么时候问,以及交易的顺序/状态。这是一个小真值表,其中max()包含nextval()各种组合的结果:

Scenario    T1            T2            T3            Nextval()   Max()
1           Committed     Committed     Committed     16             15
2           Committed     Committed     Open          16             10
3           Committed     Open          Committed     16             15
4           Committed     Open          Open          16              5
5           Open          Committed     Committed     16             15
6           Open          Committed     Open          16             10
7           Open          Open          Committed     16             15
8           Open          Open          Open          16              0

这里的整个目标是在数字线(或时间线)上放置标记以将已处理的行括起来。处理的状态不是行的属性,因为相同的行可以用于许多操作。例如多个汇总、同步到各种来源、归档等。

如果我想知道要处理什么,我需要最后处理的值,开始时为 0/NULL,以及要处理的最大值。如果我在场景 1、3、5 或 7 中运行检查,我会得到相同的最大值 15。但是,此时提交的记录会有所不同

Scenario    T1            T2            T3            Nextval()   Max()    Values
1           Committed     Committed     Committed     16             15    1-15
3           Committed     Open          Committed     16             15    1-5 and 11-15
5           Open          Committed     Committed     16             15    6-15
7           Open          Open          Committed     16             15    11-15

在每种情况下,处理窗口跟踪器都会存储最大值,每种情况下都是 15。到目前为止没有伤害。但是当我下次运行检查并希望下次找到所有未处理的行时会发生什么?他们看着> 16。最终结果是,只有场景 1 有效,在我运行检查后,所有其他的最终都会得到具有更早数字/时间戳的记录,因此这些数字永远不会被捕获。据我所知,使用序列无济于事。虽然序列不是事务绑定的,但它表示的是事务绑定的。并且时间戳没有帮助,因为没有可用的实际提交顺序时间戳功能。

我认为唯一的解决方案(正如我认为其他人已经在其他地方对我说过的那样)是逻辑解码或复制。不同之处在于复制流保证按播放、提交顺序。

上面的笔记是一个思想实验(ish)。如果 St. Dijkstra 给了我们任何东西,那就是我们可以而且应该使用思想实验来推理并发问题。如果我推理错误,忽略了什么等,请回答或评论。

于 2019-11-05T02:20:00.280 回答