0

我们使用 Azure cosmos DB 来保存作业处理管道的状态信息。为此,我们使用表 API 和相应的 SDK。最近发现系统经常出现429-Request rate is too large错误。我们的事务 DTU 利用率远低于表上配置的最大值,但我们注意到在指标选项卡下,枚举表等操作使用的系统 DTU 已用尽,因此为 429。

我们最初修复了删除“CreateIfNotExists”方法调用,帮助修复了一段时间,但最近我们又开始遇到这个问题(尽管不像以前那么频繁)。很难调试/解决这个问题,因为我找不到足够的文档来说明哪个 SDK 方法调用耗尽了这个不可扩展的资源。我已在 CosmosDB 实例上启用日志记录,但我不确定要在日志中查找什么来解决此问题

这是我们用于与 Azure Cosmos DB 交互的单例类

public class CosmosDbTableFacade : ICosmosDbTableFacade
{
        /// <summary>
        /// Initializes a new instance of the <see cref="CosmosDbTableFacade"/> class.
        /// </summary>
        /// <param name="connectionString">
        /// The connection string.
        /// </param>
        /// <param name="tableName">
        /// The table name.
        /// </param>
        public CosmosDbTableFacade(string connectionString)
        {
            var storageAccount = CloudStorageAccount.Parse(connectionString);
            this.CosmosTableClient = storageAccount.CreateCloudTableClient();
        }

        /// <summary>
        /// Gets or sets the cosmos table.
        /// </summary>
        public CloudTableClient CosmosTableClient { get; set; }

        /// <summary>
        /// The execute async.
        /// </summary>
        /// <param name="tableName">
        /// The table Name.
        /// </param>
        /// <param name="operation">
        /// The operation.
        /// </param>
        /// <returns>
        /// The <see cref="Task"/>.
        /// </returns>
        public Task<TableResult> ExecuteAsync(string tableName, TableOperation operation)
        {
            var table = this.CosmosTableClient.GetTableReference(tableName);
            return table.ExecuteAsync(operation);
        }

        /// <summary>
        /// The execute query segmented async.
        /// </summary>
        /// <param name="tableName">
        /// The table name.
        /// </param>
        /// <param name="query">
        /// The query.
        /// </param>
        /// <param name="continuationToken">
        /// The continuation token.
        /// </param>
        /// <returns>
        /// The <see cref="Task"/> which returns the list of entities.
        /// </returns>
        public Task<TableQuerySegment<DynamicTableEntity>> ExecuteQuerySegmentedAsync(string tableName, TableQuery query, TableContinuationToken continuationToken)
        {
            var table = this.CosmosTableClient.GetTableReference(tableName);
            return table.ExecuteQuerySegmentedAsync(query, continuationToken);
        }
}

以下片段列出了我们正在使用的不同查询 -

public async Task InsertOrMergeEntityAsync<T>(string tableName, T entity)
            where T : TableEntity
{
            var insertOrMergeOperation = TableOperation.InsertOrMerge(entity);
            var result = await this.CosmosDbTableFacade.ExecuteAsync(tableName, insertOrMergeOperation).ConfigureAwait(false);
            ValidateCosmosTableResult(result, "Failed to write to Cosmos Table");
}

public async Task<T> GetEntityAsync<T>(string tableName, string partitionKey, string rowKey)
            where T : TableEntity
{
            var retrieveOperation = TableOperation.Retrieve<T>(partitionKey, rowKey);
            TableResult result = await this.CosmosDbTableFacade.ExecuteAsync(tableName, retrieveOperation).ConfigureAwait(false);
            ValidateCosmosTableResult(result, "Failed to read from Cosmos Table");
            return result.Result as T;
}

public async Task<IEnumerable<T>> GetEntitiesAsync<T>(string tableName, string filterCondition)
            where T : TableEntity
{
            var query = new TableQuery().Where(filterCondition);
            var continuationToken = default(TableContinuationToken);
            var results = new List<T>();
            do
            {
                var currentQueryResults = await this.CosmosDbTableFacade.ExecuteQuerySegmentedAsync(tableName, query, continuationToken).ConfigureAwait(false);
                results.AddRange(currentQueryResults.Select(currentQueryResult =>
                    {
                        var currentEntity = TableEntity.ConvertBack<T>(currentQueryResult.Properties, null);
                        currentEntity.RowKey = currentQueryResult.RowKey;
                        currentEntity.PartitionKey = currentQueryResult.PartitionKey;
                        currentEntity.Timestamp = currentQueryResult.Timestamp;
                        currentEntity.ETag = currentQueryResult.ETag;
                        return currentEntity;
                    }));
                continuationToken = currentQueryResults.ContinuationToken;
            }
            while (continuationToken != null);

            return results;
}

下面最后一个方法中的过滤器,包含一个分区键和一个自定义列

4

1 回答 1

1

对于遇到类似问题的任何人,在我的案例中,元数据 DTU 节流的根本原因是:GetTableReference(tableName)方法(通过部署更改并将该行移至启动代码并监控 DTU 利用率来找到)。我有这个,以便我可以在运行时动态指向要读取/写入的表,但由于这正在消耗元数据 DTU,我将代码更改为使用单例作为表引用。

于 2020-06-26T05:31:30.697 回答