1

我正在尝试从异步客户端创建一种从 CosmosDB 检索项目的方法,但恐怕我充满了问题,而且微软方面几乎没有文档

我创建了一个函数,该函数将从 cosmosDB 中逐页读取项目列表,其延续将取决于连续性令牌。方法看起来像这样。请注意,可能存在一些与逐页阅读的核心功能无关的小错误:

  @FunctionName("Feed")
  public HttpResponseMessage getFeed(
    @HttpTrigger(
      name = "get",
      methods = { HttpMethod.GET },
      authLevel = AuthorizationLevel.ANONYMOUS,
      route = "Feed"
    ) final HttpRequestMessage<Optional<String>> request,
    @CosmosDBInput(
      name = "Feed",
      databaseName = Constants.DATABASE_NAME,
      collectionName = Constants.LOG_COLLECTION_NAME,
      sqlQuery = "SELECT * FROM c",  // This won't be used actually as we use our own query
      connectionStringSetting = Constants.CONNECTION_STRING_KEY 
    ) final LogEntry[] logEntryArray,
    final ExecutionContext context
  ) {
      context
      .getLogger()
      .info("Query with paging and continuation token");
      String query = "SELECT * FROM " + EnvironmentConstants.LOG_COLLECTION_NAME;

      int pageSize = 10; //No of docs per page
      int currentPageNumber = 1;
      int documentNumber = 0;
      String continuationToken = null;

      double requestCharge = 0.0;

      // First iteration (continuationToken = null): Receive a batch of query response pages
      // Subsequent iterations (continuationToken != null): Receive subsequent batch of query response pages, with continuationToken indicating where the previous iteration left off
      do {
          context
          .getLogger()
          .info("Receiving a set of query response pages.");
          context
          .getLogger()
          .info("Continuation Token: " + continuationToken + "\n");

          CosmosQueryRequestOptions queryOptions = new CosmosQueryRequestOptions();

          Flux<FeedResponse<LogEntry>> feedResponseIterator =
                  container.queryItems(query, queryOptions, LogEntry.class).byPage(continuationToken,pageSize);

          try {

              feedResponseIterator.flatMap(fluxResponse -> {
                  context
                  .getLogger()
                  .info("Got a page of query result with " +
                          fluxResponse.getResults().size() + " items(s)"
                          + " and request charge of " + fluxResponse.getRequestCharge());

                  context
                  .getLogger()
                  .info("Item Ids " + fluxResponse
                          .getResults()
                          .stream()
                          .map(LogEntry::getDate)
                          .collect(Collectors.toList()));

                  return Flux.empty();
              }).blockLast();

          } catch (Exception e) {
              
          }

      } while (continuationToken != null);

      context
      .getLogger()
      .info(String.format("Total request charge: %f\n", requestCharge));
      return request
                .createResponseBuilder(HttpStatus.OK)
                .header("Content-Type", "application/json")
                .body("ALL READ")
                .build();
  }  

为简单起见,仅记录读取的项目。

第一个问题:我们正在使用一个返回 Flux 的异步文档客户端。客户端会跟踪令牌吗?原则上它是一个无国籍的客户。我知道同步客户端可以轻松处理这种情况,但是异步客户端不会在生成第一页和令牌后重置其令牌内存吗?

第二:while 循环是否适当?我的假设是否定的,因为我们需要在标头中发回令牌,而前端 UI 需要以标头或其他类似方式将令牌发送到 Azure 函数。然后应该从上下文中提取令牌

第三:flatMap和blockList方式读取通量合适吗?我试图使用 subscribe 方法,但我再次看不到它如何适用于异步客户端。

非常感谢,亚历克斯。

4

0 回答 0