1

我当前的进程以以下方式运行:

1.) 用户在前端应用程序中输入 URL 进行分析

2.) 前端验证 URL 并在包含 URL 属性的表中创建 URL 记录

3.) 前端在表中创建/更新一行,跟踪 URL 所处的处理阶段(每个 URL 都有自己的内部 ID)

3.A) 状态码更新为“排队”状态

---- 表格定义:

ID INT PRIMARY KEY,
StatusCode INT,
StatusDescription VARCHAR(MAX),
IsInitial BIT,
LastUpdated DATETIME

4.) 前端向包含提交 URL 的内部 ID 的 Azure 存储队列发送消息

第一条消息发送到队列后------------>

4.A) 在 UI 中创建一个对象供用户单击(以“刷新”数据)

4.B)用户在创建的对象上单击(这很可能会发生)一旦创建(如果验证则立即)

4.C) 另一条消息被发送到包含 URL ID 的队列

<--------------------------------

5.) 一个正在运行的 azure webjob(后台任务)不断拾取这些消息并开始处理

6.) 网络作业确定此 URL 是否已准备好进行处理

.....它准备好开始处理如果

  • 它是新的(LastUpdated 字段为空)
  • 附加到项目的状态代码表示错误
  • 距离上次更新已 15 分钟

.....如果它退出

  • 消息中的 ID 无效
  • 附加的状态代码表示它目前正在处理中
  • 距离 LastUpdated 不到 15 分钟

一旦确定被认为可以继续...

  • 如果是新的,webjob 会将 LastUpdated 更新为 Datetime.Now
  • 在流程的每个步骤开始时,状态代码都会更新以反映这一点
  • 在该过程的最后,LastUpdated 被更新为当前时间

  • 一个 try catch 围绕着这个过程

a.) 如果过程出错,则更新状态代码以反映

b.) 一条新消息被推入队列进行重试

确定 URL 是否可以解析的函数:

    private bool IsReadyToParse(int [ID])
    {
        using (var db = EntityFactory.GetInstance())
        {
            var item = db.ProcessStatus.FirstOrDefault(x => x.ID == [ID]);

            if (item == null || item.StatusCode > 1)
            {
                return false;
            }

            if (item.StatusCode == (int)ProcessStatusEnum.Error || item.LastUpdated == null)
            {
                item.LastUpdated = DateTime.Now;
                db.Entry(item).State = EntityState.Modified;
                db.SaveChanges();
                return true;
            }

            return ((DateTime)item.LastUpdated).AddMinutes(15) < DateTime.Now;
        }
    }

队列消息通过这个函数进入:

     // This function will get triggered/executed when a new message is written 
    // on an Azure Queue
    public static void ProcessQueueMessage([QueueTrigger("[queue]")] QueueItem item, TextWriter log)
    {
        Console.WriteLine("Item found! Starting services [Id: {0}]", item.ID);

        Agent agent = new Agent([ID], log);
        agent.StartProcessing();

        log.WriteLine([Item]);
    }

...现在的问题是,这个连续运行的 webjob 一次可以接收多条消息(我想将其扩展到更多的 webjobs 从同一个队列中读取)

如何确定 IsReadyToParse() 函数实际上反映了当前的处理状态?

如果数据库即将将状态代码更新为“正在处理”,但另一个线程刚刚读取了状态代码并给出了 OK 以继续该过程怎么办?

4

2 回答 2

0

这是我的基本解决方案,开发人员测试有限……会随着我的进展而更新。

......使用这个存储过程而不是 IsReadyToParse()

CREATE PROCEDURE dbo.usp_getIsReadyForProcess
@[ID] INT 
AS
BEGIN
 BEGIN TRY

  SET TRANSACTION ISOLATION LEVEL SERIALIZABLE
  BEGIN TRANSACTION

    DECLARE @lastUpdated DATETIME
    DECLARE @statusCode INT

    -- LOCK ROW UNTIL END OF TRANSACTION
    SET @lastUpdated = (SELECT LastUpdated FROM dbo.ProcessStatus WITH (ROWLOCK, HOLDLOCK) WHERE [ID] = @[ID])
    SET @statusCode = (SELECT StatusCode FROM dbo.ProcessStatus WHERE [ID] = @[ID])

    DECLARE @isReady BIT

    --If there is no row count
    IF @@ROWCOUNT = 0
    BEGIN
        SET @isReady = 0
    END

    -- If video is already in process
    ELSE IF @statusCode > 1
    BEGIN
        SET @isReady = 0
    END

    -- If this is the first time it is getting parsed
    ELSE IF @lastUpdated IS NULL
    BEGIN
        SET @isReady = 1

        --Update datetime field
        UPDATE dbo.ProcessStatus 
        SET LastUpdated = GETDATE()
        WHERE [ID] = @[ID]
    END

    -- If is isnt the initial parse and hasnt been 15 minutes yet
    ELSE IF GETDATE() < DATEADD(MINUTE, 15, @lastUpdated)
    BEGIN
        SET @isReady = 0
    END


    -- Anything else, and its a go 
    ELSE 
    BEGIN
        SET @isReady = 1
    END



    -- If were ready to start, update the status code
    IF @isReady = 1
    BEGIN
        UPDATE dbo.ProcessStatus 
        SET StatusCode = 2 
        WHERE [ID] = @[ID]
    END

 COMMIT TRANSACTION

 SELECT @isReady

 END TRY    
 BEGIN CATCH
    -- If there was any type of error
    ROLLBACK
    SELECT 0
 END CATCH 
END
于 2015-02-22T21:21:59.443 回答
0

这是一种可能的方法,类似于 WebJobs SDK 在内部所做的,以防止多个 webjob 函数同时处理相同的 blob 触发器。

当函数从队列中提取消息时,创建一个与消息中的 ID 同名的 blob。Blob 的内容是处理的状态(Done 或 InProgress)。当一个函数想要处理具有该 ID 的消息时,它必须对该 blob 进行租用——这保证了线程安全。然后:

  • 如果它不能获取最少,则其他人正在处理消息 => 丢弃队列消息。
  • 如果它获得租约但状态为“完成”,则有人已经处理了消息 => 丢弃队列消息。
  • 如果它获得租约并且状态为“进行中”,则有人尝试处理该消息但无法完成 => 使用该消息并再次处理。

如果处理一条消息可能需要超过 60 秒,您将需要一些额外的代码来更新 blob 租约,否则它将过期并且其他人可以取走它。

于 2015-02-23T17:27:02.200 回答