3

从 CodeReview.Stackexchange 重新发布,因为它只是伪代码

我正在建立一个任务处理器工作者角色。任务被插入到一个简单的数据库表(Guid、Json 字符串、状态)中,然后将 Guid 推送到一个天蓝色的消息队列中。然后,工作者角色将从队列中弹出消息,从数据库中检索 JSON 指令并处理该任务。

我试图确保我已经涵盖了幂等性和重试的所有极端情况,并提出了以下伪代码。

我的主要问题是我的“ProcessTask”阶段是非事务性的(HTTP Posts & Service Calls to external services),重复调用会很糟糕(Mkay)

有人可以仔细检查一下,看看是否有任何我没有发现的极端情况。

让我们假设数据库中存在具有以下信息的任务。

| Id   | TaskInstruction       | Status  | PopReceipt | 
+------+-----------------------+---------+------------+
| GUID | { ... some json ... } | Pending | NULL       |

以下伪代码涵盖以下情况。

  1. 工作人员检索消息,及时处理,更新数据库并删除消息。
  2. #1,除了它需要比 30 秒的隐身超时时间更长但仍然成功。
  3. 消息处理失败,需要重试。
  4. 消息多次处理失败,超出允许的最大出列数。

出于幂等性的原因,该计划是利用 Azure BlobStorage AquireLease 机制将关键代码片段用作文件锁。

伪代码

BlobAquireLease                                  //LOCK Only Worker Role Can Retrieve at a time
    var message = Queue.GetMessage(30);          // 30 second invisibility timeout
    var dbTask = DBSession.GetTask(message.Id);  //Retrieve the DB Entry
    If dbTask.Status != Pending OR Retrying
        //Crap another role is processing this task but has 
        //overshot the 30 second timeout and now my popreceipt 
        //supercedes his - put my popreceit in the DB, and do nothing more
        DBSession.UpdateTask(message.Id, newPopReceipt) // put the new pop receipt in the db
        BlobReleaseLease                                //Release the LOCK
        return;                                         //Get out of here
    EndIf

    //Otherwise, I'm ok to handle this task.
    DBSession.UpdateTask(message.Id, Status.Processing);
BlobReleaseLock

//At this point, the task is mine, and I've got 30 seconds to do stuff

TRY
    ProcessTask(dbtask);
    //Successful
    BlobAquireLease                                                //Grab the LOCK Again
        //Get from DB Again just incase I've 
        //over shot the 30 seconds and someone else has updated the popreceipt
        var dbTaskRefreshed = DBSession.GetTask(message.Id); 
        DBSession.UpdateTask(message.Id, Status.SUCCESS);          //Update the status in the DB
        Queue.DeleteMessage(message, dbTaskRefreshed.PopReceipt);  //Delete the message
    BlobReleaseLease
ENDTRY
CATCH
    //An Error Occured and the task couldn't be processed. 
    //need to update the database
    BlobAcquireLease                                               //Grab the LOCK again

        IF message.DequeueCount > MAX ALLOWED                      //Too many Attempts
            //Get from DB Again just incase I've 
            //over shot the 30 seconds and someone else has updated the popreceipt
            var dbTaskRefreshed = DBSession.GetTask(message.Id); 
            DBSession.UpdateTask(message.Id, Status.FAILED);
            Queue.DeleteMessage(message, dbTaskRefreshed.PopReceipt)
        ELSE
            //Haven't reached the max dequeue count yet, so just 
            //let the invisibility timeout elapse on it's own and someone will retry
            DBSession.UpdateTask(message.Id, Status.RETRYING);
        END IF
    BlobReleaseLease
ENDCATCH
4

0 回答 0