我在几个表上设置了一个 SQL Server CDC。一旦启动 CDC,就会填充 cdc 表。我想处理这些更改并为发生的每个更改生成 MQ 消息,以将其发送到外部消息队列。
处理这些数据的最佳方法是什么。我看过一些像 sqdata 这样的产品,但我在想是否有更好的方法。我已经使用服务代理查看了 CDC,但这会生成仅发送到外部应用程序的消息。
我的另一个问题是,当 CDC 更改生成消息时,我希望它删除更改数据,以便如果我想扩展此处理服务,那么它不应该处理已处理的数据。
我在几个表上设置了一个 SQL Server CDC。一旦启动 CDC,就会填充 cdc 表。我想处理这些更改并为发生的每个更改生成 MQ 消息,以将其发送到外部消息队列。
处理这些数据的最佳方法是什么。我看过一些像 sqdata 这样的产品,但我在想是否有更好的方法。我已经使用服务代理查看了 CDC,但这会生成仅发送到外部应用程序的消息。
我的另一个问题是,当 CDC 更改生成消息时,我希望它删除更改数据,以便如果我想扩展此处理服务,那么它不应该处理已处理的数据。
使用的 CDC 基于使用 LSN 来找出您在流中处理的内容。您需要跟踪以某种方式处理的时间间隔(我喜欢将它们粘贴在数据库的表中)。该表看起来像:
create table dbo.CDCProcessing (
ID int identity not null,
CaptureInstance sysname not null,
FarEndpoint binary(10),
IsProcessed bit
);
create unique index [OnlyOneOpenRange]
on dbo.CDCProcessing (CaptureInstance)
where IsProcessed = 0;
您的处理循环将如下所示(对于每个捕获实例):
where IsProcessed = 0
”sys.fn_cdc_max_lsn()
。如果您找到了,只需像您自己插入一样使用它。cdc.fn_cdc_get_all_changes_<capture_instance>
使用或根据您的情况处理间隔cdc.fn_cdc_get_net_changes_<capture_instance>
。无论哪种方式,您都需要 from_lsn 的值。如果 dbo.CDCProcessing 表中有此 CaptureInstance 的行,则获取具有最大 FarEndpoint 值的行并调用sys.fn_cdc_increment_lsn
它。如果没有行,请调用sys.fn_cdc_get_min_lsn
此捕获实例。如果您处于间隔中间并且中止,则上述内容非常愚蠢。也就是说,您最终可能会多次处理某些 CDC 记录。如果这对您很重要,您可以修改流程以考虑下游系统处理的最后一条消息是什么,并相应地更新 CDCProcessing 表。但这留给读者作为练习。
至于您关于清除的其他问题,这不是它的工作原理。设置 CDC 时,应创建一个保持滚动间隔的作业(我认为它默认为 3 天)。该作业定期运行并将 CDC 数据修剪到保留间隔。因此,假设该作业运行,您不必担心它。
最近,我使用虚拟 cdc 实例创建了一个解决方案(由此,无限数量的单独和不同的实例可以使用单个真实捕获实例作用于单个目标表,这是我需要克服的问题之一。
我的解决方案是使用持久化数据表从 CT 表接收数据,然后从那里处理数据。通过使用每 30 秒运行一次的作业以及存储实例名称和最后一个 LSN 的表进行设置,我可以将数据存储在同一客户端数据库或单独的公共数据库上的持久存储中(打开或关闭同一个 sql 实例)。
这使我可以随时指定清理并减少本地存储的数据量。
对于处理方面,这将需要您使用在创建实例时创建的 cdc 函数。通过使用 [ $update_mask] 列,您可以确定实际的列更改,并且只需将这些值与 [ $start_lsn]/__$seqval、时间戳(获得来自 LSN)。
要破译 update_mask 列和时间戳,可以使用以下代码:
SELECT
[__$start_lsn] ,
[__$end_lsn] ,
[__$seqval] ,
[__$operation] ,
[__$update_mask] ,
sys.fn_cdc_map_lsn_to_time(__$start_lsn) [RowTimestamp],
reverse(stuff(reverse(
( SELECT CC.column_name + ','
FROM cdc.captured_columns CC
INNER JOIN cdc.change_tables CT ON CC.[object_id] = CT.[object_id]
WHERE capture_instance = 'MyCaptureInstanceName'
AND sys.fn_cdc_is_bit_set(CC.column_ordinal, __$update_mask) = 1
FOR
XML PATH('')
))
,1,1,'')) [ChangedColumns],
SYSDATETIME() [CopyTs]
FROM cdc.MyCaptureInstanceName_CT
WHERE [__$start_lsn] > (SELECT ISNULL(MAX(MaxLSN),0) FROM MyLogTable WHERE CaptureInstance = 'MyCaptureInstanceName')