我有 api 得到这样的请求:
case class UsersRequest(ids: List[Long])
并返回这样的响应:
case class UsersInfoResponse(info: List[Info])
case class Info(userId: Long, info: String)
另外,我有发送此请求并创建用户的方法:
def createUser(id: Long): IO[Throwable, User] = {
getUserInfo(id)
.map(info => User(id, info))
}
def getUserInfo(id:Long): IO[Throwable, String] = {
here i call grpc service
service.getUserInfo(UsersRequest(List(id)))
}
我想:
- 编写创建 ids 批处理的 ZStream
- 每 1 秒需要 10 个 id 并创建 UsersRequest
- 获取用户信息响应
- 使用 id 了解必须获取的信息
- 返回信息
所以我可以做到这一点,我应该创建类似的东西:
def getUserInfo(id:Long): IO[Throwable, String] = {
Stream
.fromQueue()
.groupedWithin(10, Duration.Zero)
.????
.runDrain
.forkManaged
AND
p <- Promise.make[Throwable, String]
interrupted <- Promise.make[Nothing, Unit]
env <- ZIO.environment[R]
}
我不知道我怎么能做到。如何构建批处理并发送请求并在匹配后通过 id 获得结果?