从 CodeReview.Stackexchange 重新发布,因为它只是伪代码
我正在建立一个任务处理器工作者角色。任务被插入到一个简单的数据库表(Guid、Json 字符串、状态)中,然后将 Guid 推送到一个天蓝色的消息队列中。然后,工作者角色将从队列中弹出消息,从数据库中检索 JSON 指令并处理该任务。
我试图确保我已经涵盖了幂等性和重试的所有极端情况,并提出了以下伪代码。
我的主要问题是我的“ProcessTask”阶段是非事务性的(HTTP Posts & Service Calls to external services),重复调用会很糟糕(Mkay)
有人可以仔细检查一下,看看是否有任何我没有发现的极端情况。
让我们假设数据库中存在具有以下信息的任务。
| Id | TaskInstruction | Status | PopReceipt |
+------+-----------------------+---------+------------+
| GUID | { ... some json ... } | Pending | NULL |
以下伪代码涵盖以下情况。
- 工作人员检索消息,及时处理,更新数据库并删除消息。
- #1,除了它需要比 30 秒的隐身超时时间更长但仍然成功。
- 消息处理失败,需要重试。
- 消息多次处理失败,超出允许的最大出列数。
出于幂等性的原因,该计划是利用 Azure BlobStorage AquireLease 机制将关键代码片段用作文件锁。
伪代码
BlobAquireLease //LOCK Only Worker Role Can Retrieve at a time
var message = Queue.GetMessage(30); // 30 second invisibility timeout
var dbTask = DBSession.GetTask(message.Id); //Retrieve the DB Entry
If dbTask.Status != Pending OR Retrying
//Crap another role is processing this task but has
//overshot the 30 second timeout and now my popreceipt
//supercedes his - put my popreceit in the DB, and do nothing more
DBSession.UpdateTask(message.Id, newPopReceipt) // put the new pop receipt in the db
BlobReleaseLease //Release the LOCK
return; //Get out of here
EndIf
//Otherwise, I'm ok to handle this task.
DBSession.UpdateTask(message.Id, Status.Processing);
BlobReleaseLock
//At this point, the task is mine, and I've got 30 seconds to do stuff
TRY
ProcessTask(dbtask);
//Successful
BlobAquireLease //Grab the LOCK Again
//Get from DB Again just incase I've
//over shot the 30 seconds and someone else has updated the popreceipt
var dbTaskRefreshed = DBSession.GetTask(message.Id);
DBSession.UpdateTask(message.Id, Status.SUCCESS); //Update the status in the DB
Queue.DeleteMessage(message, dbTaskRefreshed.PopReceipt); //Delete the message
BlobReleaseLease
ENDTRY
CATCH
//An Error Occured and the task couldn't be processed.
//need to update the database
BlobAcquireLease //Grab the LOCK again
IF message.DequeueCount > MAX ALLOWED //Too many Attempts
//Get from DB Again just incase I've
//over shot the 30 seconds and someone else has updated the popreceipt
var dbTaskRefreshed = DBSession.GetTask(message.Id);
DBSession.UpdateTask(message.Id, Status.FAILED);
Queue.DeleteMessage(message, dbTaskRefreshed.PopReceipt)
ELSE
//Haven't reached the max dequeue count yet, so just
//let the invisibility timeout elapse on it's own and someone will retry
DBSession.UpdateTask(message.Id, Status.RETRYING);
END IF
BlobReleaseLease
ENDCATCH