我编写了一个简单的(我认为...)速率限制器,以将事件驱动系统保持在我们许可的 API 命中限制之下。由于某种原因,它有时会在发送 400-500 个请求后卡住。
我最好的想法是我搞砸了等待功能,所以在某些情况下它永远不会返回,但我无法找到有缺陷的逻辑。另一个想法是我搞砸了导致问题的异步/任务互操作。它总是先工作,然后再工作。单个实例ApiRateLimiter
在多个组件之间共享,以便在系统范围内遵守命中限制。
type RequestWithReplyChannel = RequestWithKey * AsyncReplyChannel<ResponseWithKey>
type public ApiRateLimiter(httpClient: HttpClient, limitTimePeriod: TimeSpan, limitCount: int) =
let requestLimit = Math.Max(limitCount,1)
let agent = MailboxProcessor<RequestWithReplyChannel>.Start(fun inbox ->
let rec waitUntilUnderLimit (recentRequestsTimeSent: seq<DateTimeOffset>) = async{
let cutoffTime = DateTimeOffset.UtcNow.Subtract limitTimePeriod
let requestsWithinLimit =
recentRequestsTimeSent
|> Seq.filter(fun x -> x >= cutoffTime)
|> Seq.toList
if requestsWithinLimit.Length >= requestLimit then
let! _ = Async.Sleep 100 //sleep for 100 milliseconds and check request limit again
return! waitUntilUnderLimit requestsWithinLimit
else
return requestsWithinLimit
}
let rec messageLoop (mostRecentRequestsTimeSent: seq<DateTimeOffset>) = async{
// read a message
let! keyedRequest,replyChannel = inbox.Receive()
// wait until we are under our rate limit
let! remainingRecentRequests = waitUntilUnderLimit mostRecentRequestsTimeSent
let rightNow = DateTimeOffset.UtcNow
let! response =
keyedRequest.Request
|> httpClient.SendAsync
|> Async.AwaitTask
replyChannel.Reply { Key = keyedRequest.Key; Response = response }
return! messageLoop (seq {
yield rightNow
yield! remainingRecentRequests
})
}
// start the loop
messageLoop (Seq.empty<DateTimeOffset>)
)
member this.QueueApiRequest keyedRequest =
async {
return! agent.PostAndAsyncReply(fun replyChannel -> (keyedRequest,replyChannel))
} |> Async.StartAsTask
一些请求很大并且需要一点时间,但没有什么可以导致我在这件事上看到的请求发送完全死亡。
感谢您花点时间查看!