1

概述 我有两个微服务,分别称为仓库微服务(服务器)和杂货微服务(客户端)。我正在尝试使用 akka-grpc 库在它们之间实现流请求和流响应。但是在我的杂货微服务(客户端)中获取流响应时遇到了问题。下面是客户端代码说明。

问题描述: 我有一个名为responseStreamSource 类型的变量,我在其上累积了一些元素。然后我想返回这个 Source 中存在的元素的未来。

以下是我的功能片段:

def streamingBroadcast(itemIds: List[Long]): Future[List[ItemStockAvailabilityDto]] = {
      logger.debug("Performing streaming requests")

      val itemIter = itemIds.toIterator
      val requestStream: Source[CheckStockRequest, NotUsed] =
        Source
          .fromIterator(() => itemIter)
          .map { itemId => CheckStockRequest(itemId, 1) }
          .mapMaterializedValue(_ => NotUsed)
      val responseStream: Source[CheckStockReply, NotUsed] = client.checkStockToAll(requestStream)
      responseStream.runFoldAsync(List.empty[ItemStockAvailabilityDto])((acc, item) => {
        logger.debug(s"got streaming reply for id: ${item.itemId} and qty: ${item.qtyAvailable}")
        Future.successful(acc :+ ItemStockAvailabilityDto(item.itemId, item.qtyAvailable))
      })
    }

然后我有另一个方法被调用getQuote,我调用的streamingBroadcast方法看起来像这样 -

def getQuote(itemIds: List[Long], quantityRequested: Int): Future[List[GroceryItemAvailabilityDto]] = {
    streamingBroadcast(itemIds) map {
      case stockList: List[ItemStockAvailabilityDto] if stockList.nonEmpty =>
        logger.debug("we have some results for items " + stockList.map(_.itemId).mkString(","))
        stockList map { stock =>
          catalog.get(stock.itemId) match {
            case Some(item) =>
              val amt = if (stock.quantityAvailable < quantityRequested) stock.quantityAvailable else quantityRequested
              val msg = if (amt < quantityRequested) Some("We are unable to supply all you requested") else Some("We await your order")
              GroceryItemAvailabilityDto(stock.itemId, item.name, item.price, item.price * amt, quantityRequested, stock.quantityAvailable, msg)
            case None => throw new RuntimeException("item not found in warehouse: " + stock.itemId)
          }
        }
      case Nil => throw new RuntimeException("none of the items were found in warehouse: " + itemIds.mkString(","))
    }
  }

问题 我面临的问题是代码执行没有从该getQuote方法返回任何值。它可能只是挂在里面getQuote,另一个等待的演员正在超时。也许是因为该方法内部的模式匹配?我还尝试调试代码以确保列表中至少存在一些元素。

responseStream在创建或访问其中的元素时我做错了什么吗?如果您需要更多信息或仓库微服务(服务器)的代码片段,请告诉我。

4

0 回答 0