1

我们正在使用Azure CosmosDB+MongoDB API驱动MongoDB C#程序。Microsoft 的 office SDK 仅适用于 CosmosDB SQL API (DocumentDB),并且具有重试写入操作的内置功能。为此,它使用DocumentClientException.RetryAfter属性作为延迟。但是,对于Mongo驱动程序,这是不可能的。`它只返回错误:

Command insert failed: Message: {"Errors":["Request rate is large"]}.

目前,当发生写入异常时,我们设置了 1 秒的重试延迟,但这不是很干净,并且会导致每个批量写入操作的延迟量固定。这不是实时的,这意味着有时我们延迟写入下一批记录的时间超过了必要的时间。

我们如何处理这个?

4

1 回答 1

1

在 CosmosDB 中,当您使用 V3.2 MongoDB 端点时,不会返回重试信息,您基本上可以猜测和/或使用某种指数退避。在这种情况下您必须处理的异常包括:MongoCommandExceptionMongoExecutionTimeoutException

在 V3.6 中,MongoDB 端点确实返回 RetryAfterMs= 信息作为MongoBulkWriteExceptionor的一部分MongoWriteException。不幸的是,据我所知,您必须从错误消息中提取此信息,因为没有包含此值的属性。 注意:这可能是非英语系统中的一个问题,因为可以翻译错误消息。

下面是我使用Polly实现的内容- 两个端点版本都受支持,并且 V3.6 策略向后兼容 V3.2。

显然,您可能希望根据您的场景调整一些值。

    public static class Policies
    {
        public const int HttpThrottleErrorCode = 429;
        public const int HttpServiceIsUnavailable = 1;
        public const int HttpOperationExceededTimeLimit = 50;
        public const int RateLimitCode = 16500;
        public const string RetryAfterToken = "RetryAfterMs=";
        public const int MaxRetries = 10;
        public static readonly int RetryAfterTokenLength = RetryAfterToken.Length;

        private static readonly Random JitterSeed = new Random();

        public static readonly IAsyncPolicy NoPolicy = Policy.NoOpAsync();

        public static Func<int, TimeSpan> SleepDurationProviderWithJitter(double exponentialBackoffInSeconds, int maxBackoffTimeInSeconds) => retryAttempt
            => TimeSpan.FromSeconds(Math.Min(Math.Pow(exponentialBackoffInSeconds, retryAttempt), maxBackoffTimeInSeconds)) // exponential back-off: 2, 4, 8 etc
               + TimeSpan.FromMilliseconds(JitterSeed.Next(0, 1000)); // plus some jitter: up to 1 second

        public static readonly Func<int, TimeSpan> DefaultSleepDurationProviderWithJitter =
            SleepDurationProviderWithJitter(1.5, 23);


        public static readonly IAsyncPolicy MongoCommandExceptionPolicy = Policy
            .Handle<MongoCommandException>(e =>
            {
                if (e.Code != RateLimitCode || !(e.Result is BsonDocument bsonDocument))
                {
                    return false;
                }

                if (bsonDocument.TryGetValue("StatusCode", out var statusCode) && statusCode.IsInt32)
                {
                    switch (statusCode.AsInt32)
                    {
                        case HttpThrottleErrorCode:
                        case HttpServiceIsUnavailable:
                        case HttpOperationExceededTimeLimit:
                            return true;
                        default:
                            return false;
                    }
                }

                if (bsonDocument.TryGetValue("IsValid", out var isValid) && isValid.IsBoolean)
                {
                    return isValid.AsBoolean;
                }

                return true;
            })
            .WaitAndRetryAsync(
                retryCount: MaxRetries,
                DefaultSleepDurationProviderWithJitter
            );

        public static readonly IAsyncPolicy ExecutionTimeoutPolicy = Policy
            .Handle<MongoExecutionTimeoutException>(e =>
                e.Code == RateLimitCode || e.Code == HttpOperationExceededTimeLimit
            )
            .WaitAndRetryAsync(
                retryCount: MaxRetries,
                DefaultSleepDurationProviderWithJitter
            );

        public static readonly IAsyncPolicy MongoWriteExceptionPolicy = Policy
            .Handle<MongoWriteException>(e =>
            {
                return e.WriteError?.Code == RateLimitCode
                       || (e.InnerException is MongoBulkWriteException bulkException &&
                           bulkException.WriteErrors.Any(error => error.Code == RateLimitCode));
            })
            .WaitAndRetryAsync(
                retryCount: MaxRetries,
                sleepDurationProvider: (retryAttempt, e, ctx) =>
                {
                    var timeToWaitInMs = ExtractTimeToWait(e.Message);
                    if (!timeToWaitInMs.HasValue && e.InnerException != null)
                    {
                        timeToWaitInMs = ExtractTimeToWait(e.InnerException.Message);
                    }
                    return timeToWaitInMs ?? DefaultSleepDurationProviderWithJitter(retryAttempt);
                },
                onRetryAsync: (e, ts, i, ctx) => Task.CompletedTask
            );

        public static readonly IAsyncPolicy MongoBulkWriteExceptionPolicy = Policy
            .Handle<MongoBulkWriteException>(e =>
            {
                return e.WriteErrors.Any(error => error.Code == RateLimitCode);
            })
            .WaitAndRetryAsync(
                retryCount: MaxRetries,
                sleepDurationProvider: (retryAttempt, e, ctx) =>
                {
                    var timeToWaitInMs = ExtractTimeToWait(e.Message);
                    return timeToWaitInMs ?? DefaultSleepDurationProviderWithJitter(retryAttempt);
                },
                onRetryAsync: (e, ts, i, ctx) => Task.CompletedTask
            );

        /// <summary>
        /// It doesn't seem like RetryAfterMs is a property value - so unfortunately, we have to extract it from a string... (crazy??!)
        /// </summary>
        private static TimeSpan? ExtractTimeToWait(string messageToParse)
        {
            var retryPos = messageToParse.IndexOf(RetryAfterToken, StringComparison.OrdinalIgnoreCase);
            if (retryPos >= 0)
            {
                retryPos += RetryAfterTokenLength;
                var endPos = messageToParse.IndexOf(',', retryPos);
                if (endPos > 0)
                {
                    var timeToWaitInMsString = messageToParse.Substring(retryPos, endPos - retryPos);
                    if (Int32.TryParse(timeToWaitInMsString, out int timeToWaitInMs))
                    {
                        return TimeSpan.FromMilliseconds(timeToWaitInMs)
                               + TimeSpan.FromMilliseconds(JitterSeed.Next(100, 1000));
                    }
                }
            }
            return default;
        }

        /// <summary>
        /// Use this policy if your CosmosDB MongoDB endpoint is V3.2
        /// </summary>
        public static readonly IAsyncPolicy DefaultPolicyForMongo3_2 = Policy.WrapAsync(MongoCommandExceptionPolicy, ExecutionTimeoutPolicy);

        /// <summary>
        /// Use this policy if your CosmosDB MongoDB endpoint is V3.6 or V3.2
        /// </summary>
        public static readonly IAsyncPolicy DefaultPolicyForMongo3_6 = Policy.WrapAsync(MongoCommandExceptionPolicy, ExecutionTimeoutPolicy, MongoWriteExceptionPolicy, MongoBulkWriteExceptionPolicy);
    }

    public static IAsyncPolicy DefaultPolicy { get; set; } = Policies.DefaultPolicyForMongo3_6;
于 2020-02-17T14:02:30.263 回答