我正在尝试创建一个提供 OAuth2 令牌并且还负责刷新过期令牌的 Source。目前我的代码看起来有点像这样
case class Token(expires: Instant = Instant.now().plus(100, ChronoUnit.MILLIS)){
def expired = Instant.now().isAfter(expires)
}
Source
.repeat()
.mapAsync(1){ _ =>
println(" -> token req")
// this fakes an async token request to the token service
Future{
Thread.sleep(500)
println(" <- token resp")
Token()
}
}
.mapAsync(1){ token =>
println(" -> req with token auth")
if(token.expired){
println("!!! Received expired token")
}
// this is the actual call that needs the token
println("making call")
Future{
Thread.sleep(2000)
println(" <- req resp")
"OK"
}
}
.take(2)
.runWith(Sink.ignore)
.recover{case e => ()}
.flatMap{ _ =>
system.terminate()
}
此代码的输出如下所示
root -> token req
root <- token resp
root -> token req
root -> req with token auth
root making call
root <- token resp
root -> token req
root <- token resp
root -> token req
root <- token resp
root -> token req
root <- req resp
root -> req with token auth
root !!! Received expired token
root making call
root <- token resp
root -> token req
root <- token resp
root -> token req
root <- token resp
root <- req resp
root -> req with token auth
root !!! Received expired token
root making call
root ... finished with exit code 0
显然,这个 mapAsync(1) 在不期望的时候产生了需求(预取?)
有2个问题:
- 需求导致上游不需要的令牌请求
- 令牌的预取/缓存是有问题的,因为它们仅在特定的时间内有效
那么如何创建一个行为类似于此函数的真正拉流呢?
def tokenSource: () => Future[Token]