5

我们有一个 ETL 管道,它针对上传到存储帐户 (Azure) 的每个 CSV 运行。它在 CSV 上运行一些转换并将输出写入另一个位置,也作为 CSV,并调用数据库 (SQL Azure) 上的存储过程,将生成的 CSV 摄取(BULK INSERT)到临时表中。

该管道可以同时执行,因为多个资源可以将文件上传到存储。因此,临时表经常插入数据。

然后,我们有一个计划的 SQL 作业(弹性作业),它触发一个将数据从暂存表移动到最终表的 SP。此时,我们希望截断/清空临时表,以便我们不会在下一次执行作业时重新插入它们。

问题是,我们不能确定在从登台表加载到最终表和 truncate 命令之间,没有任何新数据写入登台表,可以在没有先插入到最终表中的情况下被截断。

有没有办法在我们将数据复制到最终表中时锁定临时表,以便尝试写入它的 SP(从 ETL 管道调用)只会等到锁被释放?这可以通过使用事务或一些手动锁定命令来实现吗?

如果没有,处理此问题的最佳方法是什么?

4

5 回答 5

1

我喜欢sp_getapplock并在少数地方自己使用这种方法,因为它具有灵活性,并且您可以完全控制锁定逻辑和等待时间。

我看到的唯一问题是,在您的情况下,并发进程并不完全相同。

您拥有将数据从临时表移动到主表的 SP1。您的系统从不尝试运行此 SP 的多个实例。

另一个将数据插入临时表的 SP2可以同时运行多次,这样做很好。

很容易实现阻止任何 SP1 或 SP2 组合的任何并发运行的锁定。换句话说,如果 SP1 和 SP2 的锁定逻辑相同并且它们被视为相同,则很容易。但是,您不能同时运行多个 SP2 实例。

如何实现锁定以防止 SP1 和 SP2 并发运行,同时允许多个 SP2 实例同时运行,这一点并不明显。


还有另一种方法不尝试阻止 SP 的并发运行,但接受并期望同时运行是可能的。

一种方法是IDENTITY在临时表中添加一列。或者一个自动填充的日期时间,如果你可以保证它是唯一的并且永远不会减少,这可能会很棘手。或rowversion列。

SP2 中将数据插入临时表的逻辑不会改变。

SP1 内部将数据从临时表移动到主表的逻辑需要使用这些标识值。

首先从 staging 表中读取 identity 的当前最大值,并将其记住在一个变量中,比如@MaxID. 该 SP1 中临时表中的所有后续 SELECT、UPDATE 和 DELETE 都应包含一个 filter WHERE ID <= @MaxID

这将确保如果在 SP1 运行时恰好有新行添加到暂存表中,则该行不会被处理并且将保留在暂存表中,直到 SP1 的下一次运行。

这种方法的缺点是你不能使用TRUNCATE,你需要使用DELETEwith WHERE ID <= @MaxID


如果您可以接受多个 SP2 实例(和 SP1)相互等待(和 SP1),那么您可以使用sp_getapplock类似于以下内容。我的存储过程中有这段代码。您应该将此逻辑放入 SP1 和 SP2 中。

sp_releaseapplock这里没有显式调用,因为锁的所有者设置为事务,并且引擎会在事务结束时自动释放锁。

您不必将重试逻辑放在存储过程中,它可以在运行这些存储过程的外部代码中。在任何情况下,您的代码都应该准备好重试。

CREATE PROCEDURE SP2  -- or SP1
AS
BEGIN
    SET NOCOUNT ON;
    SET XACT_ABORT ON;

    BEGIN TRANSACTION;
    BEGIN TRY
        -- Maximum number of retries
        DECLARE @VarCount int = 10;

        WHILE (@VarCount > 0)
        BEGIN
            SET @VarCount = @VarCount - 1;

            DECLARE @VarLockResult int;
            EXEC @VarLockResult = sp_getapplock
                @Resource = 'StagingTable_app_lock',
                -- this resource name should be the same in SP1 and SP2
                @LockMode = 'Exclusive',
                @LockOwner = 'Transaction',
                @LockTimeout = 60000,
                -- I'd set this timeout to be about twice the time
                -- you expect SP to run normally
                @DbPrincipal = 'public';

            IF @VarLockResult >= 0
            BEGIN
                -- Acquired the lock

                -- for SP2
                -- INSERT INTO StagingTable ...

                -- for SP1
                -- SELECT FROM StagingTable ...
                -- TRUNCATE StagingTable ...

                -- don't retry any more
                BREAK;
            END ELSE BEGIN
                -- wait for 5 seconds and retry
                WAITFOR DELAY '00:00:05';
            END;
        END;

        COMMIT TRANSACTION;
    END TRY
    BEGIN CATCH
        ROLLBACK TRANSACTION;
        -- log error
    END CATCH;

END

此代码保证在任何给定时刻只有一个过程在使用临时表。没有并发。所有其他实例将等待。

显然,如果您尝试不通过这些 SP1 或 SP2(它们首先尝试获取锁)来访问 staging 表,那么这种访问将不会被阻止。

于 2020-02-05T11:00:02.960 回答
1

我会建议使用两个相同的临时表的解决方案。让我们将它们命名为 StageLoading 和 StageProcessing。
加载过程将有以下步骤:
1. 开始时两个表都是空的。
2. 我们将一些数据加载到 StageLoading 表中(我假设每个加载都是一个事务)。
3. 当 Elastic 作业启动时,它将执行以下操作:
- ALTER TABLE SWITCH 将所有数据从 StageLoading 移动到 StageProcessing。它将使 StageLoading 为空并为下一次加载做好准备。这是一个元数据操作,所以需要几毫秒并且它是完全阻塞的,所以将在加载之间完成。
- 将数据从 StageProcessing 加载到最终表格。
- 截断表 StageProcessing。
4. 现在我们为下一个 Elastic 工作做好了准备。

如果我们在 StageProcessing 不为空时尝试进行 SWITCH,则 ALTER 将失败,这意味着最后一个加载过程失败。

于 2020-02-06T00:33:38.007 回答
0

有没有办法在我们将数据复制到最终表中时锁定临时表,以便尝试写入它的 SP(从 ETL 管道调用)只会等到锁被释放?这可以通过使用事务或一些手动锁定命令来实现吗?

看起来您正在寻找一种比事务级别更广泛的机制。SQL Server/Azure SQL DB 有一个,它被称为应用程序锁

sp_getapplock

锁定应用程序资源。

放置在资源上的锁与当前事务或当前会话相关联。与当前事务关联的锁在事务提交或回滚时被释放。当会话注销时,与会话关联的锁将被释放。当服务器因任何原因关闭时,所有锁都会被释放。

可以使用 sp_releaseapplock 显式释放锁。当应用程序为同一个锁资源多次调用 sp_getapplock 时,必须调用相同次数的 sp_releaseapplock 才能释放锁。当使用事务锁所有者打开锁时,该锁会在事务提交或回滚时释放。

这基本上意味着您的 ETL 工具应该打开单个会话到数据库,完成后获取锁定和释放。尝试做任何事情之前的其他会话应该尝试获取锁(它们不能,因为它已经被占用了),等到它释放并继续工作。

于 2020-02-02T20:15:13.177 回答
0

假设您有一个出站工作

  • 将 OutboundProcessing BIT DEFAULT 0 添加到表中
  • 在作业中,SET OutboundProcessing = 1 WHERE OutboundProcessing = 0(声明行)
  • 对于 ETL,在获取数据的查询中合并 WHERE OutboundProcessing = 1(传输行)
  • 在 ETL 之后,DELETE FROM TABLE WHERE OutboundProcessing = 1(删除您传输的行)
  • 如果 ETL 失败,SET OutboundProcessing = 0 WHERE OutboundProcessing = 1
于 2020-02-07T16:01:45.440 回答
0

我总是喜欢“识别”我收到的每个文件。如果可以做到这一点,您可以在整个加载过程中关联给定文件中的记录。你没有说需要这个,但只是说。

但是,每个文件都有一个标识(应该只是一个 int/bigint 标识值),然后您可以从“模板”加载表动态创建任意数量的加载表。

  1. 当文件到达时,创建一个以文件 ID 命名的新加载表。
  2. 处理从加载到最终表格的数据。
  3. 删除正在处理的文件的加载表。

这有点类似于使用 2 个表(加载和暂存)的其他解决方案,但即使在该解决方案中,您仍然仅限于“加载”2 个文件(尽管您仍然只将一个文件应用于最终表?)

最后,尚不清楚您的“弹性作业”是否与实际的“负载”管道/处理分离,或者是否包含在内。作为一个工作,我假设它不包括在内,如果一个工作,你一次只能运行一个实例?因此,如果您一次只能将一个文件从加载移动到最终文件,为什么一次加载多个文件很重要?为什么急于加载文件?

于 2020-02-08T00:17:20.667 回答