在您遵循以下我的建议并假设我已经正确完成了周期的数学运算之前,如果您检查消息的频率低于每 20 分 9 秒一次,那么您最好在队列上启用重新驱动策略。
SQS 的“重新驱动策略”允许您在接收到阈值数量后将消息迁移到死信队列。AWS 允许的最大接收数为 1000,超过 14 天,每次接收大约需要 20 分钟。(为简单起见,假设您的工作不会错过读取队列消息的尝试。您可以调整数字以建立对失败的容忍度。)
如果您检查的频率高于此,您将需要实施以下解决方案。
您可以在处理消息时检查此“截止日期”(当作业即将到期时),如果消息已超过您放弃它们的时间,则将消息发送到死信队列。
添加到当前例程的伪代码:
- 调用GetQueueAttributes以获取队列的消息保留期的计数(以秒为单位)。
- 调用ReceiveMessage将消息从队列中拉出。确保明确请求 SentTimestamp 可见。
- Foreach 消息,
- 通过将邮件保留期添加到发送的时间戳来查找邮件的到期时间。
- 通过从消息的到期时间中减去您想要的时间量来创建您的截止日期。
- 将截止日期与当前时间进行比较。如果截止日期已过:
- 如果截止日期尚未过去:
这是 Powershell 中的示例实现:
$queueUrl = "https://sqs.amazonaws.com/0000/my-queue"
$deadLetterQueueUrl = "https://sqs.amazonaws.com/0000/deadletter"
# Get the message retention period in seconds
$messageRetentionPeriod = (Get-SQSQueueAttribute -AttributeNames "MessageRetentionPeriod" -QueueUrl $queueUrl).Attributes.MessageRetentionPeriod
# Receive messages from our queue.
$queueMessages = @(receive-sqsmessage -QueueUrl $queueUrl -WaitTimeSeconds 5 -AttributeNames SentTimestamp)
foreach($message in $queueMessages)
{
# The sent timestamp is in epoch time.
$sentTimestampUnix = $message.Attributes.SentTimestamp
# For powershell, we need to do some quick conversion to get a DateTime.
$sentTimestamp = ([datetime]'1970-01-01 00:00:00').AddMilliseconds($sentTimestampUnix)
# Get the expiration time by adding the retention period to the sent time.
$expirationTime = $sentTimestamp.AddDays($messageRetentionPeriod / 86400 )
# I want my cutoff date to be one hour before the expiration time.
$cutoffDate = $expirationTime.AddHours(-1)
# Check if the cutoff date has passed.
if((Get-Date) -ge $cutoffDate)
{
# Cutoff Date has passed, move to deadletter queue
Send-SQSMessage -QueueUrl $deadLetterQueueUrl -MessageBody $message.Body
remove-sqsmessage -QueueUrl $queueUrl -ReceiptHandle $message.ReceiptHandle -Force
}
else
{
# Cutoff Date has not passed. Retry job?
}
}
这将为您处理的每条消息增加一些开销。这还假设您的消息处理程序将在截止时间和到期时间之间接收消息。确保您的应用程序经常轮询以接收消息。