7

我正在设计一个使用我无法控制的外部 api 的 .net 核心 web api。我找到了一些关于堆栈溢出的出色答案,这些答案允许我在使用 semaphoreslim 的同一线程中限制我对这个外部 API 的请求。我想知道如何最好地将这种限制扩展到应用程序范围,而不是仅仅限制特定的任务列表。我一直在学习 HttpMessageHandlers,这似乎是一种拦截所有传出消息并应用节流的可能方法。但我担心线程安全和我可能不理解的锁定问题。我包括我当前的节流代码,并希望这可能有助于理解我正在尝试做的事情,但跨越多个线程,并且不断添加任务而不是预定义的任务列表。

private static async Task<List<iMISPagedResultResponse>> GetAsyncThrottled(List<int> pages, int throttle, IiMISClient client, string url, int limit)
{
        var rtn = new List<PagedResultResponse>();
        var allTasks = new List<Task>();
        var throttler = new SemaphoreSlim(initialCount: throttle);
        foreach (var page in pages)
        {
            await throttler.WaitAsync();
            allTasks.Add(
                Task.Run(async () =>
                {
                    try
                    {
                        var result = await GetPagedResult(client, url, page);
                        return result;
                    }
                    finally
                    {
                        throttler.Release();
                    }
                }));
        }
        await Task.WhenAll(allTasks);
        foreach (var task in allTasks)
        {
            var result = ((Task<PagedResultResponse>)task).Result;
            rtn.Add(result);
        }
        return rtn;
}
4

1 回答 1

56

概念问题

简单的实现

所以 aThrottlingDelegatingHandler可能看起来像这样:

public class ThrottlingDelegatingHandler : DelegatingHandler
{
    private SemaphoreSlim _throttler;

    public ThrottlingDelegatingHandler(SemaphoreSlim throttler)
    {
        _throttler = throttler ?? throw new ArgumentNullException(nameof(throttler));
    }

    protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
    {
        if (request == null) throw new ArgumentNullException(nameof(request));

        await _throttler.WaitAsync(cancellationToken);
        try
        {
            return await base.SendAsync(request, cancellationToken);
        }
        finally
        {
            _throttler.Release();
        }
    }
}

创建并维护一个实例作为单例:

int maxParallelism = 10;
var throttle = new ThrottlingDelegatingHandler(new SemaphoreSlim(maxParallelism)); 

将其应用于您要通过其并行限制调用DelegatingHandler的所有实例:HttpClient

HttpClient throttledClient = new HttpClient(throttle);

HttpClient不需要是单例:只有throttle实例可以。

为简洁起见,我省略了 Dot Net Core DI 代码,但您可以在ThrottlingDelegatingHandler.Net Core 的容器中注册单例实例,在使用点通过 DI 获取该单例,然后在您构造的 s 中使用它,HttpClient如上所示。

但:

更好的实现:使用 HttpClientFactory (.NET Core 2.1+)

以上仍然引出了你将如何管理HttpClient生命周期的问题:

  • Singleton (app-scoped)HttpClient接收 DNS 更新。您的应用程序将不知道 DNS 更新,除非您杀死并重新启动它(可能不受欢迎)。
  • using (HttpClient client = ) { }另一方面,频繁创建和释放模式会导致套接字耗尽

的设计目标之一HttpClientFactory是管理HttpClient实例及其委托处理程序的生命周期,以避免这些问题。

在 .NET Core 2.1 中,您可以使用HttpClientFactory将其全部连接ConfigureServices(IServiceCollection services)Startup类中,如下所示:

int maxParallelism = 10;
services.AddSingleton<ThrottlingDelegatingHandler>(new ThrottlingDelegatingHandler(new SemaphoreSlim(maxParallelism)));

services.AddHttpClient("MyThrottledClient")
    .AddHttpMessageHandler<ThrottlingDelegatingHandler>();

(这里的“MyThrottledClient”是一种命名客户端方法,只是为了使这个示例简短;类型化客户端避免使用字符串命名。)

在使用点,IHttpClientFactory通过 DI ( reference ) 获取一个,然后调用

var client = _clientFactory.CreateClient("MyThrottledClient");

获取HttpClient预先配置了单例的实例ThrottlingDelegatingHandler

通过以这种方式获得的实例的所有调用HttpClient都将被限制(通常在整个应用程序中)到最初配置的int maxParallelism.

HttpClientFactory 神奇地处理了所有HttpClient生命周期问题。

更好的实现:使用 Polly 和 IHttpClientFactory 来获得所有这些“开箱即用”

Polly与 IHttpClientFactory 深度集成,并且 Polly 还提供Bulkhead 策略,该策略通过相同的 SemaphoreSlim 机制作为并行节流阀

因此,作为手动滚动 a 的替代方法ThrottlingDelegatingHandler,您还可以使用开箱即用的 IHttpClientFactory 的 Polly Bulkhead 策略。在你的Startup课堂上,简单地说:

int maxParallelism = 10;
var throttler = Policy.BulkheadAsync<HttpResponseMessage>(maxParallelism, Int32.MaxValue);

services.AddHttpClient("MyThrottledClient")
    .AddPolicyHandler(throttler);

HttpClient如前所述,从 HttpClientFactory 中获取预配置的实例。和以前一样,通过这样一个“MyThrottledClient”HttpClient实例的所有调用都将被并行限制到配置的maxParallelism.

Polly Bulkhead 策略还提供了配置您希望同时允许多少个操作“排队”为主信号量中的执行槽的能力。因此,例如:

var throttler = Policy.BulkheadAsync<HttpResponseMessage>(10, 100);

当如上配置为 时HttpClient,将允许 10 个并行 http 调用,以及最多 100 个对执行槽的“队列”的 http 调用。这可以通过防止下游系统出现故障导致上游排队调用的资源过度膨胀,从而为高吞吐量系统提供额外的弹性。

要将 Polly 选项与 HttpClientFactory 一起使用,请拉入Microsoft.Extensions.Http.PollyPollynuget 包。

参考:Polly deep doco on Polly 和 IHttpClientFactory隔板政策


附录重新任务

该问题使用Task.Run(...)并提到:

使用外部 api 的 .net 核心 web api

和:

不断添加任务,而不是预先定义的任务列表。

如果您的 .net 核心 web api 每次请求仅使用一次外部 API ,.net 核心 web api 处理,并且您采用本答案其余部分中讨论的方法,则无需将下游外部 http 调用卸载到新Task的只会在额外的实例和线程切换Task.Run(...)中产生开销。Task点网核心将已经在线程池的多个线程上运行传入请求。

于 2018-08-28T08:12:24.983 回答