7

我有一个表,其 ID 对处于传递关系 t中,即如果“A t B”和“B t C”则为“A t C”。样本:

  table T1
  ID1 | ID2 
  1   | 2
  1   | 5
  4   | 7
  7   | 8
  9   | 1

所以有两组,

  • g1: {1,2,5,9} 因为“1 t 2”、“1 t 5”和“9 t 1”
  • g2: {4,7,8} 因为“4 t 7”和“7 t 8”

我需要通过“纯标准 SQL”生成一个新表或视图:

  table T2
  ID1 | ID2 | LABEL 
  1   | 2   | 1
  1   | 5   | 1
  4   | 7   | 2
  7   | 8   | 2
  9   | 1   | 1

PS -1:我们可以列出“传递组”

  SELECT DISTINCT label, id   
  FROM (SELECT id1 as id, * FROM T2) UNION (SELECT id2 as id, * FROM T2)
  ORDER BY 1,2;

PS -2:我使用的是 PostgreSQL 9.1,但如果有“标准 SQL”的解决方案,我更喜欢。

4

2 回答 2

4

您可以在 Postgres 中执行此操作;您不能在所有数据库中都这样做。这是查询:

with 
    recursive cte(id1, id2) as (
     select id1, id2, 1 as level
     from t
     union all
     select t.id1, cte.id2, cte.level + 1
     from t join
          cte
          on t.id2 = cte.id1
  )
select id1, id2,
       dense_rank() over (order by grp) as label
from (select id1, id2,
             least(min(id2) over (partition by id1), min(id1) over (partition by id2)) as grp,
             level
      from cte
     ) t
where level = 1;

在这里使用 SQL Fiddle 。

您正在遍历树结构以分配标签(顺便说一下,循环可能会给这个特定版本带来问题)。在 Postgres 中,您可以使用显式recursiveCTE 来执行此操作。在 SQL Server 中,您可以使用隐式“递归”(不使用关键字)的 CTE 来执行此操作。在 Oracle 中,您可以使用connect by.

递归 CTE 获取所有相互连接的对。然后,主查询将 id1 和 id2 的最小值分配给该对,以识别所有相互连接的对。最终的标签是通过为grp.

编辑:

叶戈尔提出了一个很好的观点。以上假设ID“下降”到较小的值。以下版本改为使用每个 id 的最高级别进行分组(这确实是预期的):

with 
    recursive cte(id1, id2) as (
     select id1, id2, 1 as level
     from t
     union all
     select t.id1, cte.id2, cte.level + 1
     from t join
          cte
          on t.id2 = cte.id1
    --  where not exists (select 1 from cte cte2 where cte2.id1 = t.id1 and cte2.id2 = t.id2) 
  ) 
select id1, id2,
       dense_rank() over (order by topvalue) as label
from (select id1, id2,
             first_value(id2) over (partition by id1 order by level desc) as topvalue,
             level
      from cte
     ) t
where level = 1;

编辑二:

回应叶戈尔的第二条评论。这个数据相对于原始问题有点问题。以下将其分为两部分:

with 
    recursive cte as (
     select id1, id2, id2 as last, id1||','||id2 as grp, 1 as level
     from t
     where id2 not in (select id1 from t)
     union all
     select t.id1, t.id2, cte.last, cte.grp, cte.level + 1
     from t join
          cte
          on t.id2 = cte.id1
    --  where not exists (select 1 from cte cte2 where cte2.id1 = t.id1 and cte2.id2 = t.id2) 
  ) 
select *
from cte;

但是,目前尚不清楚这是否是原始人想要的。它将原始分成三个重叠的组,因为第二列中有三个 id 永远不会在第一列中。这里的问题是关于交换性的。

于 2013-08-03T14:03:35.183 回答
4

现在,2013 年的一个新需求,我需要使用 10000个项目:使用@GordonLinoff 的优雅解决方案(上图),1000 个项目需要 1 秒,2000 个项目需要 1 天......没有很好的性能。性能问题也记在这里

@NealB 算法

(这是最好的解决方案,速度如此之快!)请参阅原始和教学描述。这里的表格T1与问题文本相同,第二个(临时)表格R用于处理和显示结果,

 CREATE TABLE R (
   id integer NOT NULL, -- PRIMARY KEY,
   label integer NOT NULL DEFAULT 0
 );
 CREATE FUNCTION t1r_labeler() RETURNS void AS $funcBody$
   DECLARE
      label1 integer;
      label2 integer;
      newlabel integer;
      t t1%rowtype;
   BEGIN
       DELETE FROM R;
       INSERT INTO R(id) 
           SELECT DISTINCT unnest(array[id1,id2]) 
           FROM T1 ORDER BY 1;
      newlabel:=0;
      FOR t IN SELECT * FROM t1
      LOOP   -- --  BASIC LABELING:  -- --
         SELECT label INTO label1 FROM R WHERE id=t.id1;
         SELECT label INTO label2 FROM R WHERE id=t.id2;
         IF label1=0 AND label2=0 THEN 
              newlabel:=newlabel+1;
              UPDATE R set label=newlabel WHERE ID in (t.id1,t.id2);
         ELSIF label1=0 AND label2!=0 THEN 
              UPDATE R set label=label2 WHERE ID=t.id1;
         ELSIF label1!=0 AND label2=0 THEN 
              UPDATE R set label=label1 WHERE ID=t.id2;
         ELSIF label1!=label2 THEN -- time consuming
              UPDATE tmp.R set label=label1 WHERE label = label2;
         END IF;
      END LOOP;
    END;
 $funcBody$ LANGUAGE plpgsql VOLATILE; 

准备和运行,

 -- same CREATE TABLE T1 (id1 integer, id2 integer);
 DELETE FROM T1; 
 INSERT INTO T1(id1,id2)  -- populate the standard input
 VALUES (1, 2), (1, 5), (4, 7), (7, 8), (9, 1);
   -- or  SELECT id1, id2 FROM table_with_1000000_items;

 SELECT t1r_labeler();       -- run
 SELECT * FROM R ORDER BY 2; -- show

处理最坏的情况

最后一个条件 whenlabel1!=label2是最耗时的操作,必须避免或在高连通性的情况下可以分离,这是最糟糕的情况。

要报告某种警报,您可以计算程序运行最后一个条件的次数比例,并且/cor 可以分隔最后一次更新。

如果分开,您可以更好地分析和处理它们因此,消除最后一个ELSIF并在第一个循环之后添加您的检查和第二个循环:

      -- ... first loop and checks here ...
      FOR t IN SELECT * FROM tmp.t1 
      LOOP   -- --  MERGING LABELS:  -- --
         SELECT label INTO label1 FROM R WHERE id=t.id1;
         SELECT label INTO label2 FROM R WHERE id=t.id2;
         IF label1!=0 AND label2!=0 AND label1!=label2 THEN
             UPDATE R set label=label1 WHERE label=label2;
         END IF;
      END LOOP;
      -- ...

最坏情况的示例:具有超过 1000 个(连接的)节点的组变成 10000 个节点,平均长度为“每个标记组 10 个”(核心)并且只有很少的路径连接核心。

面向数组的算法

这种另一种解决方案速度较慢(是一种蛮力算法),但是当您需要直接处理数组并且不需要如此快速的解决方案(并且没有“最坏情况”)时可以使用。

正如@peter.petrov 和@RBarryYoung 建议使用更充分的数据结构......我回到我的数组作为“更充分的数据结构”。毕竟,使用下面的解决方案 (!) 有很好的加速(与 @GordonLinoff 的算法相比)

第一步是将t1问题文本表转换为临时表transgroup1,我们可以在其中计算新过程,

 -- DROP table transgroup1;
 CREATE TABLE transgroup1 (
   id serial NOT NULL PRIMARY KEY,
   items integer[], -- two or more items in the transitive relationship
   dels integer[] DEFAULT array[]::integer[]
 );
 INSERT INTO transgroup1(items)
   SELECT array[id1, id2] FROM t1; -- now suppose t1 a 10000 items table;

他们,通过这两个功能我们可以解决问题,

 CREATE FUNCTION array_uunion(anyarray,anyarray) RETURNS anyarray AS $$
     -- ensures distinct items of a concatemation
    SELECT ARRAY(SELECT  unnest($1) UNION SELECT  unnest($2))
 $$ LANGUAGE sql immutable;


  CREATE FUNCTION transgroup1_loop() RETURNS void AS
  $BODY$
    DECLARE
       cp_dels integer[];
       i integer;
       max_i integer;
    BEGIN
       i:=1;
       max_i:=10; -- or 100 or more, but need some control to be secure
       LOOP
           UPDATE transgroup1
           SET items = array_uunion(transgroup1.items,t2.items),
               dels =  transgroup1.dels || t2.id
           FROM transgroup1 AS t1, transgroup1 AS t2
           WHERE transgroup1.id=t1.id AND t1.id>t2.id AND t1.items && t2.items;

           cp_dels := array(
               SELECT DISTINCT unnest(dels) FROM transgroup1
           ); -- ensures all itens to del
           EXIT WHEN i>max_i OR array_length(cp_dels,1)=0;

           DELETE FROM transgroup1 WHERE id IN (SELECT unnest(cp_dels));
           UPDATE transgroup1 SET dels=array[]::integer[];
           i:=i+1;
       END LOOP;
       UPDATE transgroup1         -- only to beautify
       SET items = ARRAY(SELECT unnest(items) ORDER BY 1 desc);
     END;
  $BODY$ LANGUAGE plpgsql VOLATILE;

当然,要运行并查看结果,您可以使用

 SELECT transgroup1_loop(); -- not 1 day but some hours!     
 SELECT *, dense_rank() over (ORDER BY id) AS group from transgroup1;

结果

 id |   items   | ssg_label | dels | group 
----+-----------+-----------+------+-------
  4 | {8,7,4}   | 1         | {}   |     1
  5 | {9,5,2,1} | 1         | {}   |     2
于 2013-12-16T10:21:54.687 回答