26

设想

我们有一个非常标准的数据导入过程,在该过程中我们加载一个 staging表,然后MERGE将其放入一个target表中。

新要求(绿色)涉及将导入数据的子集捕获到单独的queue表中以进行完全不相关的处理。

场景架构

“挑战”

(1) 子集由选择的记录组成:仅新插入到target表中的记录。

(2) 子集是一些插入列的投影,但也是至少一个仅存在于源(staging 表)中的列。

(3)MERGE语句已经OUTPUT..INTO严格使用子句来记录$actionstake by MERGE,以便我们可以 统计PIVOT结果和COUNT插入、更新和删除的次数。我们真的不喜欢像那样缓冲整个数据集的动作,而是更喜欢动态聚合总和。不用说,我们不想向该OUTPUT表添加更多数据。

MERGE (4)无论出于何种原因,我们都不想执行第二次执行的匹配工作,即使是部分执行。表 target真的很大,我们不能索引所有的东西,而且操作通常相当昂贵(分钟,而不是秒)。

(5) 我们不考虑将任何输出从MERGE到客户端,以便客户端可以queue通过立即将其发送回来将其路由到 。数据必须保留在服务器上。

(6) 我们希望避免将整个数据集缓冲在 和 之间staging的临时存储中queue

最好的方法是什么?

失败

(a) 仅将插入的记录排入队列的要求使我们无法queue直接在 的OUTPUT..INTO子句中定位表MERGE,因为它不允许任何WHERE子句。我们可以使用一些 CASE技巧来标记不需要的记录,以便后续删除queue而不进行处理,但这似乎很疯狂。

(b) 因为某些用于 的列queue没有出现在 target表中,我们不能简单地在目标表上添加插入触发器来加载queue. “数据流拆分”必须尽快发生。

(c) 由于我们已经在 中使用了一个OUTPUT..INTO子句MERGE,我们也不能添加第二个OUTPUT子句并将嵌套MERGE到 an INSERT..SELECT中来加载队列。这是一种耻辱,因为对于原本效果很好的东西来说,这感觉像是一个完全任意的限制;SELECT过滤器只过滤 $action我们想要的记录 ( )INSERT并将INSERT它们放在queue单个语句中。因此,DBMS 理论上可以避免缓冲整个数据集,而是简单地将其流式传输到queue. (注:我们没有追求,很可能它实际上并没有这样优化计划。)

情况

我们觉得我们已经用尽了我们的选择,但还是决定求助于主脑来确定。我们能想到的只有:

(S1) 创建一个表,VIEWtarget表还包含用于queue唯一数据的可为空列,并让 SELECT语句将它们定义为NULL。然后,设置INSTEAD OF 填充target表和queue 适当的触发器。最后,连接MERGE到目标视图。这行得通,但我们不是该构造的粉丝——它 看起来确实很棘手。

(S2) 放弃,使用另一个将整个数据集缓冲在一个临时表中MERGE..OUTPUT。之后MERGE,立即将临时表中的数据(再次!)复制到queue.

4

6 回答 6

17

我的理解是,主要障碍是OUTPUTSQL Server 中子句的限制。它允许一个OUTPUT INTO table和/或一个OUTPUT将结果集返回给调用者。

您想以MERGE两种不同的方式保存语句的结果:

  • MERGE为收集统计信息而受影响的所有行
  • 仅插入行queue

简单变体

我会使用你的 S2 解决方案。至少开始。它易于理解和维护,并且应该非常高效,因为最耗费资源的操作(MERGE对其Target自身只会执行一次)。下面还有第二个变体,比较它们在真实数据上的表现会很有趣。

所以:

  • 使用OUTPUT INTO @TempTableMERGE
  • 在插入之前从 into或聚合INSERT的所有行。如果您只需要聚合统计信息,那么聚合该批次的结果并将其合并到最终结果中而不是复制所有行是有意义的。@TempTableStatsStats
  • INSERTQueue仅从@TempTable. _

我将从@i-one 的答案中获取样本数据。

架构

-- I'll return to commented lines later

CREATE TABLE [dbo].[TestTarget](
    -- [ID] [int] IDENTITY(1,1) NOT NULL,
    [foo] [varchar](10) NULL,
    [bar] [varchar](10) NULL
);

CREATE TABLE [dbo].[TestStaging](
    [foo] [varchar](10) NULL,
    [bar] [varchar](10) NULL,
    [baz] [varchar](10) NULL
);

CREATE TABLE [dbo].[TestStats](
    [MergeAction] [nvarchar](10) NOT NULL
);

CREATE TABLE [dbo].[TestQueue](
    -- [TargetID] [int] NOT NULL,
    [foo] [varchar](10) NULL,
    [baz] [varchar](10) NULL
);

样本数据

TRUNCATE TABLE [dbo].[TestTarget];
TRUNCATE TABLE [dbo].[TestStaging];
TRUNCATE TABLE [dbo].[TestStats];
TRUNCATE TABLE [dbo].[TestQueue];

INSERT INTO [dbo].[TestStaging]
    ([foo]
    ,[bar]
    ,[baz])
VALUES
    ('A', 'AA', 'AAA'),
    ('B', 'BB', 'BBB'),
    ('C', 'CC', 'CCC');

INSERT INTO [dbo].[TestTarget]
    ([foo]
    ,[bar])
VALUES
    ('A', 'A_'),
    ('B', 'B?');

合并

DECLARE @TempTable TABLE (
    MergeAction nvarchar(10) NOT NULL,
    foo varchar(10) NULL,
    baz varchar(10) NULL);

MERGE INTO TestTarget AS Dst
USING TestStaging AS Src
ON Dst.foo = Src.foo
WHEN MATCHED THEN
UPDATE SET
    Dst.bar = Src.bar
WHEN NOT MATCHED BY TARGET THEN
INSERT (foo, bar)
VALUES (Src.foo, Src.bar)
OUTPUT $action AS MergeAction, inserted.foo, Src.baz
INTO @TempTable(MergeAction, foo, baz)
;

INSERT INTO [dbo].[TestStats] (MergeAction)
SELECT T.MergeAction
FROM @TempTable AS T;

INSERT INTO [dbo].[TestQueue]
    ([foo]
    ,[baz])
SELECT
    T.foo
    ,T.baz
FROM @TempTable AS T
WHERE T.MergeAction = 'INSERT'
;

SELECT * FROM [dbo].[TestTarget];
SELECT * FROM [dbo].[TestStats];
SELECT * FROM [dbo].[TestQueue];

结果

TestTarget
+-----+-----+
| foo | bar |
+-----+-----+
| A   | AA  |
| B   | BB  |
| C   | CC  |
+-----+-----+

TestStats
+-------------+
| MergeAction |
+-------------+
| INSERT      |
| UPDATE      |
| UPDATE      |
+-------------+

TestQueue
+-----+-----+
| foo | baz |
+-----+-----+
| C   | CCC |
+-----+-----+

第二种变体

在 SQL Server 2014 Express 上测试。

OUTPUT子句可以将其结果集发送到表和调用者。因此,OUTPUT INTO可以直接进入Stats,如果我们将语句包装MERGE到存储过程中,那么我们可以使用INSERT ... EXEC进入Queue.

如果您检查执行计划,您会发现INSERT ... EXEC无论如何都会在幕后创建一个临时表(另请参阅Adam Machanic的 INSERT EXEC 的隐藏成本),因此我希望在创建临时表时整体性能与第一个变体相似明确地。

还有一个要解决的问题:Queue表应该只有“插入”的行,而不是所有受影响的行。为此,您可以在Queue表上使用触发器来丢弃“插入”以外的行。另一种可能性是定义一个唯一索引IGNORE_DUP_KEY = ON并以这样一种方式准备数据,即“未插入”的行将违反唯一索引并且不会插入到表中。

因此,我将ID IDENTITYTarget表格中添加一TargetID列,然后在表格中添加一列Queue。(在上面的脚本中取消注释它们)。另外,我将在Queue表中添加一个索引:

CREATE UNIQUE NONCLUSTERED INDEX [IX_TargetID] ON [dbo].[TestQueue]
(
    [TargetID] ASC
) WITH (
PAD_INDEX = OFF, 
STATISTICS_NORECOMPUTE = OFF, 
SORT_IN_TEMPDB = OFF, 
IGNORE_DUP_KEY = ON, 
DROP_EXISTING = OFF, 
ONLINE = OFF, 
ALLOW_ROW_LOCKS = ON, 
ALLOW_PAGE_LOCKS = ON)

重要的部分是UNIQUEIGNORE_DUP_KEY = ON

这是 的存储过程MERGE

CREATE PROCEDURE [dbo].[TestMerge]
AS
BEGIN
    SET NOCOUNT ON;
    SET XACT_ABORT ON;

    MERGE INTO dbo.TestTarget AS Dst
    USING dbo.TestStaging AS Src
    ON Dst.foo = Src.foo
    WHEN MATCHED THEN
    UPDATE SET
        Dst.bar = Src.bar
    WHEN NOT MATCHED BY TARGET THEN
    INSERT (foo, bar)
    VALUES (Src.foo, Src.bar)
    OUTPUT $action INTO dbo.TestStats(MergeAction)
    OUTPUT CASE WHEN $action = 'INSERT' THEN inserted.ID ELSE 0 END AS TargetID, 
    inserted.foo,
    Src.baz
    ;

END

用法

TRUNCATE TABLE [dbo].[TestTarget];
TRUNCATE TABLE [dbo].[TestStaging];
TRUNCATE TABLE [dbo].[TestStats];
TRUNCATE TABLE [dbo].[TestQueue];

-- Make sure that `Queue` has one special row with TargetID=0 in advance.
INSERT INTO [dbo].[TestQueue]
    ([TargetID]
    ,[foo]
    ,[baz])
VALUES
    (0
    ,NULL
    ,NULL);

INSERT INTO [dbo].[TestStaging]
    ([foo]
    ,[bar]
    ,[baz])
VALUES
    ('A', 'AA', 'AAA'),
    ('B', 'BB', 'BBB'),
    ('C', 'CC', 'CCC');

INSERT INTO [dbo].[TestTarget]
    ([foo]
    ,[bar])
VALUES
    ('A', 'A_'),
    ('B', 'B?');

INSERT INTO [dbo].[TestQueue]
EXEC [dbo].[TestMerge];

SELECT * FROM [dbo].[TestTarget];
SELECT * FROM [dbo].[TestStats];
SELECT * FROM [dbo].[TestQueue];

结果

TestTarget
+----+-----+-----+
| ID | foo | bar |
+----+-----+-----+
|  1 | A   | AA  |
|  2 | B   | BB  |
|  3 | C   | CC  |
+----+-----+-----+

TestStats
+-------------+
| MergeAction |
+-------------+
| INSERT      |
| UPDATE      |
| UPDATE      |
+-------------+

TestQueue
+----------+------+------+
| TargetID | foo  | baz  |
+----------+------+------+
|        0 | NULL | NULL |
|        3 | C    | CCC  |
+----------+------+------+

期间会有额外的消息INSERT ... EXEC

Duplicate key was ignored.

如果MERGE更新了一些行。INSERT当唯一索引在到期期间丢弃某些行时发送此警告消息IGNORE_DUP_KEY = ON

将重复的键值插入唯一索引时会出现警告消息。只有违反唯一性约束的行才会失败。

于 2016-01-02T13:33:06.847 回答
7

考虑以下两种方法来解决问题:

  • 在单个语句中将数据合并到目标中并插入队列中的输出,并在目标上创建的触发器中汇总统计信息。批次标识符可以通过临时表传递给触发器。
  • 在单个语句中将数据合并到目标中并将输出插入队列中,并在合并后立即汇总统计信息,使用内置的更改跟踪功能,而不是在触发器中进行。

方法 1(在触发器中合并数据并收集统计信息):

示例数据设置(为简单起见省略了索引和约束):

create table staging (foo varchar(10), bar varchar(10), baz varchar(10));
create table target (foo varchar(10), bar varchar(10));
create table queue (foo varchar(10), baz varchar(10));
create table stats (batchID int, inserted bigint, updated bigint, deleted bigint);

insert into staging values
    ('A', 'AA', 'AAA')
    ,('B', 'BB', 'BBB')
    ,('C', 'CC', 'CCC')
    ;

insert into target values
    ('A', 'A_')
    ,('B', 'B?')
    ,('E', 'EE')
    ;

用于收集插入/更新/删除统计信息的触发器:

create trigger target_onChange
on target
after delete, update, insert
as
begin
    set nocount on;

    if object_id('tempdb..#targetMergeBatch') is NULL
        return;

    declare @batchID int;
    select @batchID = batchID from #targetMergeBatch;

    merge into stats t
    using (
        select
            batchID = @batchID,
            cntIns = count_big(case when i.foo is not NULL and d.foo is NULL then 1 end),
            cntUpd = count_big(case when i.foo is not NULL and d.foo is not NULL then 1 end),
            cntDel = count_big(case when i.foo is NULL and d.foo is not NULL then 1 end)
        from inserted i
            full join deleted d on d.foo = i.foo
    ) s
    on t.batchID = s.batchID
    when matched then
        update
        set
            t.inserted = t.inserted + s.cntIns,
            t.updated = t.updated + s.cntUpd,
            t.deleted = t.deleted + s.cntDel
    when not matched then
        insert (batchID, inserted, updated, deleted)
        values (s.batchID, s.cntIns, s.cntUpd, cntDel);

end

合并语句:

declare @batchID int;
set @batchID = 1;-- or select @batchID = batchID from ...;

create table #targetMergeBatch (batchID int);
insert into #targetMergeBatch (batchID) values (@batchID);

insert into queue (foo, baz)
select foo, baz
from
(
    merge into target t
    using staging s
    on t.foo = s.foo
    when matched then
        update
        set t.bar = s.bar
    when not matched then
        insert (foo, bar)
        values (s.foo, s.bar)
    when not matched by source then
        delete
    output $action, inserted.foo, s.baz
) m(act, foo, baz)
where act = 'INSERT'
    ;

drop table #targetMergeBatch

检查结果:

select * from target;
select * from queue;
select * from stats;

目标:

foo        bar
---------- ----------
A          AA
B          BB
C          CC

队列:

foo        baz
---------- ----------
C          CCC

统计:

batchID  inserted   updated   deleted
-------- ---------- --------- ---------
1        1          2         1

方法 2(收集统计数据,使用更改跟踪功能):

示例数据设置与前一种情况相同(只需删除所有内容,包括触发器并从头开始重新创建表),除了在这种情况下我们需要在目标上进行 PK 以使示例工作:

create table target (foo varchar(10) primary key, bar varchar(10));

在数据库上启用更改跟踪:

alter database Test
    set change_tracking = on

在目标表上启用更改跟踪:

alter table target
    enable change_tracking

之后立即合并数据并获取统计信息,通过更改上下文过滤以仅计算受合并影响的行:

begin transaction;
declare @batchID int, @chVersion bigint, @chContext varbinary(128);
set @batchID = 1;-- or select @batchID = batchID from ...;
SET @chVersion = change_tracking_current_version();
set @chContext = newid();

with change_tracking_context(@chContext)
insert into queue (foo, baz)
select foo, baz
from
(
    merge into target t
    using staging s
    on t.foo = s.foo
    when matched then
        update
        set t.bar = s.bar
    when not matched then
        insert (foo, bar)
        values (s.foo, s.bar)
    when not matched by source then
        delete
    output $action, inserted.foo, s.baz
) m(act, foo, baz)
where act = 'INSERT'
    ;

with ch(foo, op) as (
    select foo, sys_change_operation
    from changetable(changes target, @chVersion) ct
    where sys_change_context = @chContext
)
insert into stats (batchID, inserted, updated, deleted)
select @batchID, [I], [U], [D]
from ch
    pivot(count_big(foo) for op in ([I], [U], [D])) pvt
    ;

commit transaction;

检查结果:

select * from target;
select * from queue;
select * from stats;

它们与之前的示例相同。

目标:

foo        bar
---------- ----------
A          AA
B          BB
C          CC

队列:

foo        baz
---------- ----------
C          CCC

统计:

batchID  inserted   updated   deleted
-------- ---------- --------- ---------
1        1          2         1
于 2015-12-30T23:53:51.577 回答
5

AFTER INSERT / DELETE / UPDATE我建议使用以下三个独立触发器来提取统计信息:

create trigger dbo.insert_trigger_target
on [dbo].[target]
after insert
as
insert into dbo.[stats] ([action],[count])
select 'insert', count(1)
from inserted;
go

create trigger dbo.update_trigger_target
on [dbo].[target]
after update
as
insert into dbo.[stats] ([action],[count])
select 'update', count(1) from inserted -- or deleted == after / before image, count will be the same
go

create trigger dbo.delete_trigger_target
on [dbo].[target]
after delete
as
insert into dbo.[stats] ([action],[count])
select 'delete', count(1) from deleted
go

如果您需要更多上下文,请放入一些CONTEXT_INFO内容并将其从触发器中取出。

现在,我将断言 AFTER 触发器并不昂贵,但您需要对其进行测试才能确定。

处理完这个问题后,您可以自由地在 the 中使用OUTPUT子句 ( NOT OUTPUT INTO ),MERGE然后使用嵌套在 select 中的子句来对要进入queue表的数据进行子集化。

理由

由于需要从两者访问列stagingtarget为 构建数据queue,因此必须使用 中的选项来完成OUTPUT操作MERGE,因为没有其他任何东西可以访问“双方”。

那么,如果我们劫持了OUTPUTfor 子句queue,我们如何重新处理这个功能呢?考虑到您所描述的统计数据要求,我认为AFTER触发器会起作用。事实上,考虑到可用的图像,如果需要,统计数据可能会非常复杂。我断言AFTER触发器“不那么昂贵”,因为之前和之后的数据必须始终可用,以便事务可以同时提交回滚 - 是的,需要扫描数据(甚至获取计数)但这似乎并没有太大的成本。

在我自己的分析中,扫描使执行计划的基本成本增加了大约 5%

听起来像一个解决方案?

于 2016-01-04T09:11:35.170 回答
3

通过临时表导入可能会更有效地使用顺序而不是面向集合的处理。我会考虑MERGE用游标扫描重写为存储过程。然后,对于每条记录,您可以拥有任意数量的输出以及任何不使用数据透视表的计数,总成本为一次staging表扫描。

存储过程还可能提供将处理拆分为较小事务的机会,而对较大数据集的触发器可能会导致事务日志溢出。

于 2016-01-04T09:45:57.227 回答
3

您是否考虑过放弃合并,只在不存在的地方进行插入和更新?然后,您可以使用插入的输出子句来填充您的队列表。

于 2016-01-01T22:08:08.190 回答
2

除非我遗漏了什么,否则一个简单的插入命令应该可以满足您的所有要求。

insert into queue
(foo, baz)
select staging.foo, staging.baz
from staging join target on staging.foo = target.boo
where whatever

这将在合并到目标之后发生。

仅对于新记录,请在合并之前执行此操作

insert into queue
(foo, baz)
select staging.foo, staging.baz
from staging left join target on staging.foo = target.boo
where target.foo = null
于 2015-12-30T17:05:36.430 回答