6

我编写了一个简单的(我认为...)速率限制器,以将事件驱动系统保持在我们许可的 API 命中限制之下。由于某种原因,它有时会在发送 400-500 个请求后卡住。

我最好的想法是我搞砸了等待功能,所以在某些情况下它永远不会返回,但我无法找到有缺陷的逻辑。另一个想法是我搞砸了导致问题的异步/任务互操作。它总是先工作,然后再工作。单个实例ApiRateLimiter在多个组件之间共享,以便在系统范围内遵守命中限制。

type RequestWithReplyChannel = RequestWithKey * AsyncReplyChannel<ResponseWithKey>

type public ApiRateLimiter(httpClient: HttpClient, limitTimePeriod: TimeSpan, limitCount: int) =

let requestLimit = Math.Max(limitCount,1)

let agent = MailboxProcessor<RequestWithReplyChannel>.Start(fun inbox -> 

    let rec waitUntilUnderLimit (recentRequestsTimeSent: seq<DateTimeOffset>) = async{
        let cutoffTime = DateTimeOffset.UtcNow.Subtract limitTimePeriod
        let requestsWithinLimit = 
            recentRequestsTimeSent 
            |> Seq.filter(fun x -> x >= cutoffTime)
            |> Seq.toList

        if requestsWithinLimit.Length >= requestLimit then
            let! _ = Async.Sleep 100 //sleep for 100 milliseconds and check request limit again
            return! waitUntilUnderLimit requestsWithinLimit
        else
            return requestsWithinLimit
    }

    let rec messageLoop (mostRecentRequestsTimeSent: seq<DateTimeOffset>) = async{
        // read a message
        let! keyedRequest,replyChannel = inbox.Receive()
        // wait until we are under our rate limit
        let! remainingRecentRequests = waitUntilUnderLimit mostRecentRequestsTimeSent

        let rightNow = DateTimeOffset.UtcNow

        let! response =
            keyedRequest.Request
            |> httpClient.SendAsync
            |> Async.AwaitTask

        replyChannel.Reply { Key = keyedRequest.Key; Response = response }

        return! messageLoop (seq {
            yield rightNow
            yield! remainingRecentRequests
        })
    }

    // start the loop
    messageLoop (Seq.empty<DateTimeOffset>)
)            

member this.QueueApiRequest keyedRequest =
    async {
        return! agent.PostAndAsyncReply(fun replyChannel -> (keyedRequest,replyChannel))
    } |> Async.StartAsTask

一些请求很大并且需要一点时间,但没有什么可以导致我在这件事上看到的请求发送完全死亡。

感谢您花点时间查看!

4

1 回答 1

3

我注意到您正在建立一个使用 seq 发送请求的最近时间的列表:

seq {
    yield rightNow
    yield! remainingRecentRequests
}

因为 F# 序列是惰性的,所以这会产生一个枚举器,当被询问其下一个值时,它将首先产生一个值,然后将开始迭代其子 seq 并产生一个值。每次产生一个新请求时,都会添加一个新的枚举器——但旧的枚举器什么时候处理?你会认为它们一旦过期就会被处理掉,也就是说,一旦Seq.filter进入waitUntilUnderLimit返回假。但想一想:F# 编译器如何知道过滤条件一旦为假就永远为假?如果没有深入的代码分析(编译器不会这样做),它就做不到。因此,“旧”序列永远不会被垃圾收集,因为它们仍然被保留以防万一需要它们。我不能 100% 确定这一点,因为我没有测量您的代码的内存使用情况,但是如果您要测量您的ApiRateLimiter实例的内存使用情况,我敢打赌,您会看到它稳步增长而不会下降。

我还注意到您在 seq的前面添加了新项目。这与 F# 列表使用的语义完全相同,但是对于列表,没有要分配的 IEnumerable 对象,一旦列表项不符合List.filter条件,它将被丢弃。因此,我重写了您的代码以使用最近时间的列表而不是 seq,并且为了提高效率,我还进行了另一项更改:因为您创建列表的方式保证了它将被排序,最近的事件排在最前面,最旧的排在最后,我换成List.filter. List.takeWhile这样,当第一个日期早于截止日期时,它将停止检查旧日期。

通过此更改,您现在应该拥有实际过期的旧日期,并且您的班级的内存使用量ApiRateLimiter应该会波动但保持不变。(每次waitUntilUnderLimit调用它都会创建新列表,因此会产生一些 GC 压力,但这些都应该在第 0 代)。我不知道这是否会解决您的挂起问题,但这是我在您的代码中看到的唯一问题。

顺便说一句,我也用更简单的替换了你的let! _ = Async.Sleep 100do! Async.Sleep 100。这里没有效率提升,但没有必要使用let! _ =等待Async<unit>返回;这正是do!关键字的用途。

type RequestWithReplyChannel = RequestWithKey * AsyncReplyChannel<ResponseWithKey>

type public ApiRateLimiter(httpClient: HttpClient, limitTimePeriod: TimeSpan, limitCount: int) =

    let requestLimit = Math.Max(limitCount,1)

    let agent = MailboxProcessor<RequestWithReplyChannel>.Start(fun inbox -> 

        let rec waitUntilUnderLimit (recentRequestsTimeSent: DateTimeOffset list) = async{
            let cutoffTime = DateTimeOffset.UtcNow.Subtract limitTimePeriod
            let requestsWithinLimit = 
                recentRequestsTimeSent 
                |> List.takeWhile (fun x -> x >= cutoffTime)

            if List.length requestsWithinLimit >= requestLimit then
                do! Async.Sleep 100 //sleep for 100 milliseconds and check request limit again
                return! waitUntilUnderLimit requestsWithinLimit
            else
                return requestsWithinLimit
        }

        let rec messageLoop (mostRecentRequestsTimeSent: DateTimeOffset list) = async{
            // read a message
            let! keyedRequest,replyChannel = inbox.Receive()
            // wait until we are under our rate limit
            let! remainingRecentRequests = waitUntilUnderLimit mostRecentRequestsTimeSent

            let rightNow = DateTimeOffset.UtcNow

            let! response =
                keyedRequest.Request
                |> httpClient.SendAsync
                |> Async.AwaitTask

            replyChannel.Reply { Key = keyedRequest.Key; Response = response }

            return! messageLoop (rightNow :: remainingRecentRequests)
        }

        // start the loop
        messageLoop []
    )            

    member this.QueueApiRequest keyedRequest =
        async {
            return! agent.PostAndAsyncReply(fun replyChannel -> (keyedRequest,replyChannel))
        } |> Async.StartAsTask
于 2018-04-03T04:49:39.107 回答