概述 我有两个微服务,分别称为仓库微服务(服务器)和杂货微服务(客户端)。我正在尝试使用 akka-grpc 库在它们之间实现流请求和流响应。但是在我的杂货微服务(客户端)中获取流响应时遇到了问题。下面是客户端代码说明。
问题描述:
我有一个名为responseStream
Source 类型的变量,我在其上累积了一些元素。然后我想返回这个 Source 中存在的元素的未来。
以下是我的功能片段:
def streamingBroadcast(itemIds: List[Long]): Future[List[ItemStockAvailabilityDto]] = {
logger.debug("Performing streaming requests")
val itemIter = itemIds.toIterator
val requestStream: Source[CheckStockRequest, NotUsed] =
Source
.fromIterator(() => itemIter)
.map { itemId => CheckStockRequest(itemId, 1) }
.mapMaterializedValue(_ => NotUsed)
val responseStream: Source[CheckStockReply, NotUsed] = client.checkStockToAll(requestStream)
responseStream.runFoldAsync(List.empty[ItemStockAvailabilityDto])((acc, item) => {
logger.debug(s"got streaming reply for id: ${item.itemId} and qty: ${item.qtyAvailable}")
Future.successful(acc :+ ItemStockAvailabilityDto(item.itemId, item.qtyAvailable))
})
}
然后我有另一个方法被调用getQuote
,我调用的streamingBroadcast
方法看起来像这样 -
def getQuote(itemIds: List[Long], quantityRequested: Int): Future[List[GroceryItemAvailabilityDto]] = {
streamingBroadcast(itemIds) map {
case stockList: List[ItemStockAvailabilityDto] if stockList.nonEmpty =>
logger.debug("we have some results for items " + stockList.map(_.itemId).mkString(","))
stockList map { stock =>
catalog.get(stock.itemId) match {
case Some(item) =>
val amt = if (stock.quantityAvailable < quantityRequested) stock.quantityAvailable else quantityRequested
val msg = if (amt < quantityRequested) Some("We are unable to supply all you requested") else Some("We await your order")
GroceryItemAvailabilityDto(stock.itemId, item.name, item.price, item.price * amt, quantityRequested, stock.quantityAvailable, msg)
case None => throw new RuntimeException("item not found in warehouse: " + stock.itemId)
}
}
case Nil => throw new RuntimeException("none of the items were found in warehouse: " + itemIds.mkString(","))
}
}
问题
我面临的问题是代码执行没有从该getQuote
方法返回任何值。它可能只是挂在里面getQuote
,另一个等待的演员正在超时。也许是因为该方法内部的模式匹配?我还尝试调试代码以确保列表中至少存在一些元素。
responseStream
在创建或访问其中的元素时我做错了什么吗?如果您需要更多信息或仓库微服务(服务器)的代码片段,请告诉我。