0

我现在有什么?

目前,我有一个配置了RetryAsync策略的客户端,该策略使用主地址并在发生故障时切换到故障转移地址。连接详细信息从机密管理器中读取。

services
    .AddHttpClient ("MyClient", client => client.BaseAddress = PlaceholderUri)
    .ConfigureHttpMessageHandlerBuilder (builder => {

        // loads settings from secret manager
        var settings = configLoader.LoadSettings().Result;

        builder.PrimaryHandler = new HttpClientHandler {
            Credentials = new NetworkCredential (settings.Username, settings.Password),
            AutomaticDecompression = DecompressionMethods.GZip
        };

        var primaryBaseAddress = new Uri (settings.Host);
        var failoverBaseAddress = new Uri (settings.DrHost);

        builder.AdditionalHandlers.Add (new PolicyHttpMessageHandler (requestMessage => {
            var relativeAddress = PlaceholderUri.MakeRelativeUri (requestMessage.RequestUri);
            requestMessage.RequestUri = new Uri (primaryBaseAddress, relativeAddress);

            return HttpPolicyExtensions.HandleTransientHttpError ()
                .RetryAsync ((result, retryCount) =>
                    requestMessage.RequestUri = new Uri (failoverBaseAddress, relativeAddress));
        }));
    });

我想达到什么目标?

一般来说

我的客户可以使用主服务或故障转移服务。当主服务器关闭时,使用故障转移直到主服务器备份。当两者都关闭时,我们会收到警报,并且可以通过秘密管理器动态更改服务地址。

在代码中

现在我还想介绍一个CircuitBreakerPolicy并将这两个策略链接在一起。我正在寻找一种封装的配置,并且在客户端级别而不是在使用该客户端的类上处理故障。

场景解释

让我们假设有一个断路器策略包装在具有单个客户端的重试策略中。

断路器配置为在主基地址上的瞬态错误尝试失败 3次后断开电路60 秒。- 地址从主地址更改为故障转移。OnBreak

策略配置为处理BrokenCircuitException并重试一次,并将地址从主更改为故障转移以继续。

  1. 主地址请求 - 500 代码
  2. 主地址请求 - 500 代码
  3. 主地址请求 - 500 代码(连续 3 次失败)
  4. 断路60秒
  5. 对主地址的请求 -BrokenCircuitException被重试策略捕获,调用故障转移
  6. 对主地址的请求 -BrokenCircuitException被重试策略捕获,调用故障转移
  7. 对主地址的请求 -BrokenCircuitException被重试策略捕获,调用故障转移
  8. 对主地址的请求 -BrokenCircuitException被重试策略捕获,调用故障转移
  9. (60 秒后)电路半开 - (此处可以再断开 60 秒或打开 - 假设打开)
  10. 主地址请求 - 200 代码

如本文所述有一个解决方案是使用包装在回退中的断路器,但正如您在那里看到的,默认和回退的逻辑是在类中实现的,而不是在客户端级别上实现的。

我想

public class OpenExchangeRatesClient
{
    private readonly HttpClient _client;
    private readonly Policy _policy;
    public OpenExchangeRatesClient(string apiUrl)
    {
        _client = new HttpClient
        {
            BaseAddress = new Uri(apiUrl),
        };

        var circuitBreaker = Policy
            .Handle<Exception>()
            .CircuitBreakerAsync(
                exceptionsAllowedBeforeBreaking: 2,
                durationOfBreak: TimeSpan.FromMinutes(1)
            );

        _policy = Policy
            .Handle<Exception>()
            .FallbackAsync(() => GetFallbackRates())
            .Wrap(circuitBreaker);
    }

    public Task<ExchangeRates> GetLatestRates()
    {
        return _policy
            .ExecuteAsync(() => CallRatesApi());
    }

    public Task<ExchangeRates> CallRatesApi()
    {
        //call the API, parse the results
    }

    public Task<ExchangeRates> GetFallbackRates()
    {
        // load the rates from the embedded file and parse them
    }
}

改写为

public class OpenExchangeRatesClient 
{
    private readonly HttpClient _client;
    public OpenExchangeRatesClient (IHttpClientFactory clientFactory) {
        _client = clientFactory.CreateClient ("MyClient");
    }

    public Task<ExchangeRates> GetLatestRates () {
        return _client.GetAsync ("/rates-gbp-usd");
    }
}

我读了什么?

我尝试了什么?

我尝试了几种不同的场景来链接断路器策略并将其与重试策略相结合,以在启动文件中的客户端杠杆上实现预期目标。最后一个状态如下。这些策略按照重试能够捕获 a 的顺序进行包装BrokenCircuitException,但情况并非如此。在消费者类上抛出异常,这不是预期的结果。虽然RetryPolicy被触发了,但是还是抛出了消费者类上的异常。

var retryPolicy = GetRetryPolicy();
var circuitBreaker = GetCircuitBreakerPolicy();

var policyWraper = Policy.WrapAsync(retryPolicy, circuitBreaker);

services
    .AddHttpClient("TestClient", client => client.BaseAddress = GetPrimaryUri())
    .AddPolicyHandler(policyWraper);

static IAsyncPolicy<HttpResponseMessage> GetCircuitBreakerPolicy()
{
    return HttpPolicyExtensions
        .HandleTransientHttpError()
        .CircuitBreakerAsync(
            3,
            TimeSpan.FromSeconds(45),
            OnBreak,
            OnReset, 
            OnHalfOpen);
}

static IAsyncPolicy<HttpResponseMessage> GetRetryPolicy()
{
    return Policy<HttpResponseMessage>
        .Handle<Exception>()
        .RetryAsync(1, (response, retryCount) =>
        {
            Debug.WriteLine("Retries on broken circuit");
        });
}

我省略了方法,OnBreak因为它们只是打印一些消息。OnResetOnHalfOpen

更新:从控制台添加了日志。

Circuit broken (after 3 attempts)
Retries on broken
Exception thrown: 'System.AggregateException' in System.Private.CoreLib.dll 
Retries on broken circuit
Exception thrown: 'System.AggregateException' in System.Private.CoreLib.dll

'CircuitBreakerPolicy.exe'(CoreCLR:clrhost):加载'C:\ Program Retries on broken circuit 异常抛出:System.Private.CoreLib.dll中的'System.AggregateException'

更新 2:使用配置了策略的客户端向类添加了参考 URL

更新 3:该项目已更新,以便WeatherService2.Get以所需的方式实施工作:当主要服务不可用时,电路中断,使用故障转移服务直到电路关闭。这将是这个问题的答案,但是我想探索一个解决方案,WeatherService.GetStartup.

使用客户端引用。使用类引用项目

在上面的日志中可以看到Exception thrown: 'System.AggregateException' in System.Private.CoreLib.dll断路器抛出了哪些 - 这是不期望的,因为有重试包装断路器。

4

2 回答 2

1

我已经下载了你的项目并使用它,所以这是我的观察:

阻塞与非阻塞

  • 因为您的代码使用阻塞异步调用 ( .Result) 这就是您看到的原因 AggregateException
public IEnumerable<WeatherForecast> Get()
{
    HttpResponseMessage response = null;
    try
    {
        response = _client.GetAsync(string.Empty).Result; //AggregateException  
    }
    catch (Exception e)
    {
        Debug.WriteLine($"{e.Message}");
    }
    ...
}
  • 为了打开你需要使用InnerExceptionAggregateExceptionawait
public async Task<IEnumerable<WeatherForecast>> Get()
{
    HttpResponseMessage response = null;
    try
    {
        response = await _client.GetAsync(string.Empty); //BrokenCircuitException
    }
    catch (Exception e)
    {
        Debug.WriteLine($"{e.Message}");
    }
    ...
}

升级

每当您将策略包装到另一个策略中时,都可能会发生升级。这意味着如果内部无法处理问题,那么它会将相同的问题传播到外部,外部可能会也可能无法处理。如果最外层没有处理问题,那么(大部分时间)原始异常将被抛出给弹性策略的消费者(这是策略的组合)。

在这里您可以找到有关升级的更多详细信息。

让我们在您的案例中回顾一下这个概念:

var policyWrapper = Policy.WrapAsync(retryPolicy, circuitBreaker);

static IAsyncPolicy<HttpResponseMessage> GetCircuitBreakerPolicy()
{
    return HttpPolicyExtensions
        .HandleTransientHttpError()
        .CircuitBreakerAsync(3, TimeSpan.FromSeconds(45), ...);
}

static IAsyncPolicy<HttpResponseMessage> GetRetryPolicy()
{
    return Policy<HttpResponseMessage>
        .Handle<Exception>()
        .RetryAsync(1, ...);
}
  1. 初始请求(1.尝试)针对https://httpstat.us/500
  2. 它返回 500,这会将连续瞬态故障从 0 增加到 1
  3. CB 升级问题重试
  4. 重试未处理状态 500,因此未触发重试
  5. httpClient 返回一个HttpResponseMessage状态InternalServerError码。

让我们修改重试策略以处理瞬态 http 错误:

static IAsyncPolicy<HttpResponseMessage> GetCircuitBreakerPolicy()
{
    return HttpPolicyExtensions
        .HandleTransientHttpError()
        .CircuitBreakerAsync(3, TimeSpan.FromSeconds(45), ...);
}

static IAsyncPolicy<HttpResponseMessage> GetRetryPolicy()
{
    return HttpPolicyExtensions
        .HandleTransientHttpError()
        .Or<Exception>()
        .RetryAsync(1, ...);
}
  1. 初始请求(1.尝试)针对https://httpstat.us/500
  2. 它返回 500,这会将连续瞬态故障从 0 增加到 1
  3. CB 升级问题重试
  4. Retry 正在处理状态 500,因此 retry 立即发出另一次尝试
  5. 第一次重试请求(2.尝试)针对https://httpstat.us/500
  6. 它返回 500,这会将连续瞬态故障从 1 增加到 2
  7. CB 升级问题重试
  8. 即使重试正在处理状态 500,它也不会触发,因为它达到了重试计数 (1)
  9. httpClient 返回一个HttpResponseMessage带有InternalServerErrorStatusCode 的值。

现在,让我们将连续失败计数从 3 降低到 1 并BrokenCircuitException明确处理:

static IAsyncPolicy<HttpResponseMessage> GetCircuitBreakerPolicy()
{
    return HttpPolicyExtensions
        .HandleTransientHttpError()
        .CircuitBreakerAsync(1, TimeSpan.FromSeconds(45), ...);
}

static IAsyncPolicy<HttpResponseMessage> GetRetryPolicy()
{
    return HttpPolicyExtensions
        .HandleTransientHttpError()
        .Or<BrokenCircuitException>()
        .RetryAsync(1, ...);
}
  1. 初始请求(1.尝试)针对https://httpstat.us/500
  2. 它返回 500,这会将连续瞬态故障从 0 增加到 1
  3. 断路器打开,因为它达到了预定义的阈值
  4. CB 升级问题重试
  5. Retry 正在处理状态 500,因此 retry 立即发出另一次尝试
  6. 第一次重试请求(2.尝试)针对https://httpstat.us/500
  7. CB 阻止此调用,因为它已损坏
  8. CB抛出一个BrokenCircuitException
  9. 即使 Retry 正在处理BrokenCircuitException它也不会触发,因为它达到了它的重试计数 (1)
  10. 重试会抛出原​​始异常 ( BrokenCircuitException),因此 httpClientGetAsync会抛出该异常。

最后让我们将 retryCount 从 1 增加到 2:

static IAsyncPolicy<HttpResponseMessage> GetCircuitBreakerPolicy()
{
    return HttpPolicyExtensions
        .HandleTransientHttpError()
        .CircuitBreakerAsync(1, TimeSpan.FromSeconds(45), ...);
}

static IAsyncPolicy<HttpResponseMessage> GetRetryPolicy()
{
    return HttpPolicyExtensions
        .HandleTransientHttpError()
        .Or<BrokenCircuitException>()
        .RetryAsync(2, ...);
}
  1. 初始请求(1.尝试)针对https://httpstat.us/500
  2. 它返回 500,这会将连续瞬态故障从 0 增加到 1
  3. 断路器打开,因为它达到了预定义的阈值
  4. CB 升级问题重试
  5. Retry 正在处理状态 500,因此 retry 立即发出另一次尝试
  6. 第一次重试请求(2.尝试)针对https://httpstat.us/500
  7. CB 阻止此调用,因为它已损坏
  8. CB抛出一个BrokenCircuitException
  9. Retry 正在处理BrokenCircuitException,它没有超过它的 retryCount,所以它立即发出另一个尝试
  10. 发出第二次重试请求(3.尝试)https://httpstat.us/500
  11. CB 阻止此调用,因为它已损坏
  12. CB抛出一个BrokenCircuitException
  13. 即使 Retry 正在处理BrokenCircuitException它也不会触发,因为它达到了它的重试计数 (2)
  14. 重试将抛出原始异常 ( BrokenCircuitException),因此 httpClientGetAsync将抛出该异常。

我希望这个练习可以帮助您更好地理解如何创建弹性策略,通过升级问题来组合多个策略。

于 2021-03-25T10:44:24.677 回答
0

我已经查看了您的替代解决方案,该解决方案与我在上一篇文章中讨论的设计问题相同。

public WeatherService2(IHttpClientFactory clientFactory, IEnumerable<IAsyncPolicy<HttpResponseMessage>> policies)
{
    _primaryClient = clientFactory.CreateClient("PrimaryClient");
    _failoverClient = clientFactory.CreateClient("FailoverClient");
    _circuitBreaker = policies.First(p => p.PolicyKey == "CircuitBreaker");

    _policy = Policy<HttpResponseMessage>
        .Handle<Exception>()
        .FallbackAsync(_ => CallFallbackForecastApi())
        .WrapAsync(_circuitBreaker);
}

public async Task<string> Get()
{
    var response = await _policy.ExecuteAsync(async () => await CallForecastApi());

    if (response.IsSuccessStatusCode) 
        return response.StatusCode.ToString();

    response = await CallFallbackForecastApi();
    return response.StatusCode.ToString();
}

您的后备策略永远不会被触发。

  1. HttpClient 收到 statusCode 500 的响应
  2. 断路器断开
  3. HttpResponseMessageCB 将statusCode 500传播到外部策略
  4. 回退不会触发,因为它是为异常设置的Handle<Exception>()
  5. 策略返回HttpResponseMessagestatusCode 500
  6. 您的代码手动检查响应,然后手动调用回退。

如果您将政策更改为:

_policy = Policy
    .HandleResult<HttpResponseMessage>(response => response != null && !response.IsSuccessStatusCode)
    .Or<Exception>()
    .FallbackAsync(_ => CallFallbackForecastApi())
    .WrapAsync(_circuitBreaker);

那么就不需要手动回退了。

  1. HttpClient 收到 statusCode 500 的响应
  2. 断路器断开
  3. HttpResponseMessageCB 将statusCode 500传播到外部策略
  4. 回退触发,因为它也是为不成功的状态代码设置的
  5. HttpClient 收到 statusCode 200 的响应
  6. 策略返回HttpResponseMessagestatusCode 500

您还需要了解一件更重要的事情。前面的代码之所以有效,是因为您在没有断路器策略的情况下注册了 HttpClients 。

这意味着 CB未附加到 HttpClient。因此,如果您像这样更改代码:

public async Task<HttpResponseMessage> CallForecastApi()
    => await _primaryClient.GetAsync("https://httpstat.us/500/");

public async Task<HttpResponseMessage> CallFallbackForecastApi()
    => await _primaryClient.GetAsync("https://httpstat.us/200/");

那么即使 CircuitBreaker 在第一次尝试后会打开,CallFallbackForecastApi也不会抛出BrokenCircuitException.

但是,如果您像这样将 CB 附加到 HttpClient:

services
    .AddHttpClient("PrimaryClient", client => client.BaseAddress = PlaceholderUri)
    ...
    .AddPolicyHandler(GetCircuitBreakerPolicy());

然后你WeatherService2像这样简化:

private readonly HttpClient _primaryClient;
private readonly IAsyncPolicy<HttpResponseMessage> _policy;

public WeatherService2(IHttpClientFactory clientFactory)
{
    _primaryClient = clientFactory.CreateClient("PrimaryClient");
    _policy = Policy
        .HandleResult<HttpResponseMessage>(response => response != null && !response.IsSuccessStatusCode)
        .Or<Exception>()
        .FallbackAsync(_ => CallFallbackForecastApi());
}

那么它将惨遭失败BrokenCircuitException


如果你WeatherService2看起来像这样:

public class WeatherService2 : IWeatherService2
{
    private readonly HttpClient _primaryClient;
    private readonly HttpClient _secondaryClient;
    private readonly IAsyncPolicy<HttpResponseMessage> _policy;
    public WeatherService2(IHttpClientFactory clientFactory)
    {
        _primaryClient = clientFactory.CreateClient("PrimaryClient");
        _secondaryClient = clientFactory.CreateClient("FailoverClient");

        _policy = Policy
            .HandleResult<HttpResponseMessage>(response => response != null && !response.IsSuccessStatusCode)
            .Or<Exception>()
            .FallbackAsync(_ => CallFallbackForecastApi());
    }

    public async Task<string> Get()
    {
        var response = await _policy.ExecuteAsync(async () => await CallForecastApi());
        return response.StatusCode.ToString();
    }

    public async Task<HttpResponseMessage> CallForecastApi()
        => await _primaryClient.GetAsync("https://httpstat.us/500/");

    public async Task<HttpResponseMessage> CallFallbackForecastApi()
        => await _secondaryClient.GetAsync("https://httpstat.us/200/");
}

那么只有当PrimaryClientFailoverClient不同的断路器时它才能正常工作。

services
    .AddHttpClient("PrimaryClient", client => client.BaseAddress = PlaceholderUri)
    ...
    .AddPolicyHandler(GetCircuitBreakerPolicy());

services
    .AddHttpClient("FailoverClient", client => client.BaseAddress = PlaceholderUri)
    ...
    .AddPolicyHandler(GetCircuitBreakerPolicy());

如果他们将共享同一个断路器,那么第二次调用将再次失败,并带有BrokenCircuitException.

var cbPolicy = GetCircuitBreakerPolicy();

services
    .AddHttpClient("PrimaryClient", client => client.BaseAddress = PlaceholderUri)
    ...
    .AddPolicyHandler(cbPolicy);

services
    .AddHttpClient("FailoverClient", client => client.BaseAddress = PlaceholderUri)
    ...
    .AddPolicyHandler(cbPolicy);
于 2021-04-08T08:48:30.550 回答