我正在尝试从异步客户端创建一种从 CosmosDB 检索项目的方法,但恐怕我充满了问题,而且微软方面几乎没有文档
我创建了一个函数,该函数将从 cosmosDB 中逐页读取项目列表,其延续将取决于连续性令牌。方法看起来像这样。请注意,可能存在一些与逐页阅读的核心功能无关的小错误:
@FunctionName("Feed")
public HttpResponseMessage getFeed(
@HttpTrigger(
name = "get",
methods = { HttpMethod.GET },
authLevel = AuthorizationLevel.ANONYMOUS,
route = "Feed"
) final HttpRequestMessage<Optional<String>> request,
@CosmosDBInput(
name = "Feed",
databaseName = Constants.DATABASE_NAME,
collectionName = Constants.LOG_COLLECTION_NAME,
sqlQuery = "SELECT * FROM c", // This won't be used actually as we use our own query
connectionStringSetting = Constants.CONNECTION_STRING_KEY
) final LogEntry[] logEntryArray,
final ExecutionContext context
) {
context
.getLogger()
.info("Query with paging and continuation token");
String query = "SELECT * FROM " + EnvironmentConstants.LOG_COLLECTION_NAME;
int pageSize = 10; //No of docs per page
int currentPageNumber = 1;
int documentNumber = 0;
String continuationToken = null;
double requestCharge = 0.0;
// First iteration (continuationToken = null): Receive a batch of query response pages
// Subsequent iterations (continuationToken != null): Receive subsequent batch of query response pages, with continuationToken indicating where the previous iteration left off
do {
context
.getLogger()
.info("Receiving a set of query response pages.");
context
.getLogger()
.info("Continuation Token: " + continuationToken + "\n");
CosmosQueryRequestOptions queryOptions = new CosmosQueryRequestOptions();
Flux<FeedResponse<LogEntry>> feedResponseIterator =
container.queryItems(query, queryOptions, LogEntry.class).byPage(continuationToken,pageSize);
try {
feedResponseIterator.flatMap(fluxResponse -> {
context
.getLogger()
.info("Got a page of query result with " +
fluxResponse.getResults().size() + " items(s)"
+ " and request charge of " + fluxResponse.getRequestCharge());
context
.getLogger()
.info("Item Ids " + fluxResponse
.getResults()
.stream()
.map(LogEntry::getDate)
.collect(Collectors.toList()));
return Flux.empty();
}).blockLast();
} catch (Exception e) {
}
} while (continuationToken != null);
context
.getLogger()
.info(String.format("Total request charge: %f\n", requestCharge));
return request
.createResponseBuilder(HttpStatus.OK)
.header("Content-Type", "application/json")
.body("ALL READ")
.build();
}
为简单起见,仅记录读取的项目。
第一个问题:我们正在使用一个返回 Flux 的异步文档客户端。客户端会跟踪令牌吗?原则上它是一个无国籍的客户。我知道同步客户端可以轻松处理这种情况,但是异步客户端不会在生成第一页和令牌后重置其令牌内存吗?
第二:while 循环是否适当?我的假设是否定的,因为我们需要在标头中发回令牌,而前端 UI 需要以标头或其他类似方式将令牌发送到 Azure 函数。然后应该从上下文中提取令牌
第三:flatMap和blockList方式读取通量合适吗?我试图使用 subscribe 方法,但我再次看不到它如何适用于异步客户端。
非常感谢,亚历克斯。