我们使用 Azure cosmos DB 来保存作业处理管道的状态信息。为此,我们使用表 API 和相应的 SDK。最近发现系统经常出现429-Request rate is too large错误。我们的事务 DTU 利用率远低于表上配置的最大值,但我们注意到在指标选项卡下,枚举表等操作使用的系统 DTU 已用尽,因此为 429。
我们最初修复了删除“CreateIfNotExists”方法调用,帮助修复了一段时间,但最近我们又开始遇到这个问题(尽管不像以前那么频繁)。很难调试/解决这个问题,因为我找不到足够的文档来说明哪个 SDK 方法调用耗尽了这个不可扩展的资源。我已在 CosmosDB 实例上启用日志记录,但我不确定要在日志中查找什么来解决此问题
这是我们用于与 Azure Cosmos DB 交互的单例类
public class CosmosDbTableFacade : ICosmosDbTableFacade
{
/// <summary>
/// Initializes a new instance of the <see cref="CosmosDbTableFacade"/> class.
/// </summary>
/// <param name="connectionString">
/// The connection string.
/// </param>
/// <param name="tableName">
/// The table name.
/// </param>
public CosmosDbTableFacade(string connectionString)
{
var storageAccount = CloudStorageAccount.Parse(connectionString);
this.CosmosTableClient = storageAccount.CreateCloudTableClient();
}
/// <summary>
/// Gets or sets the cosmos table.
/// </summary>
public CloudTableClient CosmosTableClient { get; set; }
/// <summary>
/// The execute async.
/// </summary>
/// <param name="tableName">
/// The table Name.
/// </param>
/// <param name="operation">
/// The operation.
/// </param>
/// <returns>
/// The <see cref="Task"/>.
/// </returns>
public Task<TableResult> ExecuteAsync(string tableName, TableOperation operation)
{
var table = this.CosmosTableClient.GetTableReference(tableName);
return table.ExecuteAsync(operation);
}
/// <summary>
/// The execute query segmented async.
/// </summary>
/// <param name="tableName">
/// The table name.
/// </param>
/// <param name="query">
/// The query.
/// </param>
/// <param name="continuationToken">
/// The continuation token.
/// </param>
/// <returns>
/// The <see cref="Task"/> which returns the list of entities.
/// </returns>
public Task<TableQuerySegment<DynamicTableEntity>> ExecuteQuerySegmentedAsync(string tableName, TableQuery query, TableContinuationToken continuationToken)
{
var table = this.CosmosTableClient.GetTableReference(tableName);
return table.ExecuteQuerySegmentedAsync(query, continuationToken);
}
}
以下片段列出了我们正在使用的不同查询 -
public async Task InsertOrMergeEntityAsync<T>(string tableName, T entity)
where T : TableEntity
{
var insertOrMergeOperation = TableOperation.InsertOrMerge(entity);
var result = await this.CosmosDbTableFacade.ExecuteAsync(tableName, insertOrMergeOperation).ConfigureAwait(false);
ValidateCosmosTableResult(result, "Failed to write to Cosmos Table");
}
public async Task<T> GetEntityAsync<T>(string tableName, string partitionKey, string rowKey)
where T : TableEntity
{
var retrieveOperation = TableOperation.Retrieve<T>(partitionKey, rowKey);
TableResult result = await this.CosmosDbTableFacade.ExecuteAsync(tableName, retrieveOperation).ConfigureAwait(false);
ValidateCosmosTableResult(result, "Failed to read from Cosmos Table");
return result.Result as T;
}
public async Task<IEnumerable<T>> GetEntitiesAsync<T>(string tableName, string filterCondition)
where T : TableEntity
{
var query = new TableQuery().Where(filterCondition);
var continuationToken = default(TableContinuationToken);
var results = new List<T>();
do
{
var currentQueryResults = await this.CosmosDbTableFacade.ExecuteQuerySegmentedAsync(tableName, query, continuationToken).ConfigureAwait(false);
results.AddRange(currentQueryResults.Select(currentQueryResult =>
{
var currentEntity = TableEntity.ConvertBack<T>(currentQueryResult.Properties, null);
currentEntity.RowKey = currentQueryResult.RowKey;
currentEntity.PartitionKey = currentQueryResult.PartitionKey;
currentEntity.Timestamp = currentQueryResult.Timestamp;
currentEntity.ETag = currentQueryResult.ETag;
return currentEntity;
}));
continuationToken = currentQueryResults.ContinuationToken;
}
while (continuationToken != null);
return results;
}
下面最后一个方法中的过滤器,包含一个分区键和一个自定义列