问题标题可能信息量不大,因为我正在尝试实现多种功能。我想根据他发送的标头授权调用者并将此信息传播到 gRPC 方法处理程序。问题在于授权过程的异步性质。我最终得到了这个:
case class AsyncContextawareInterceptor[A](
f: Metadata ⇒ Future[Either[Status, (Context.Key[A], A)]]
)(implicit val system: ActorSystem)
extends ServerInterceptor
with AnyLogging {
import system.dispatcher
sealed trait Msg
case object HalfClose extends Msg
case object Cancel extends Msg
case object Complete extends Msg
case object Ready extends Msg
case class Message[T](msg: T) extends Msg
override def interceptCall[ReqT, RespT](call: ServerCall[ReqT, RespT],
headers: Metadata,
next: ServerCallHandler[ReqT, RespT]): ServerCall.Listener[ReqT] =
new ServerCall.Listener[ReqT] {
private val stash = new java.util.concurrent.ConcurrentLinkedQueue[Msg]()
private var interceptor: Option[ServerCall.Listener[ReqT]] = None
private def enqueueAndProcess(msg: Msg) =
if (interceptor.isDefined) processMessage(msg) else stash.add(msg)
private def processMessage(msg: Msg) = msg match {
case HalfClose ⇒ interceptor.foreach(_.onHalfClose)
case Cancel ⇒ interceptor.foreach(_.onCancel)
case Complete ⇒ interceptor.foreach(_.onComplete)
case Ready ⇒ interceptor.foreach(_.onReady)
case Message(msg: ReqT @unchecked) ⇒ interceptor.foreach(_.onMessage(msg))
}
private def processMessages() = while (!stash.isEmpty) {
Option(stash.poll).foreach(processMessage)
}
override def onHalfClose(): Unit = enqueueAndProcess(HalfClose)
override def onCancel(): Unit = enqueueAndProcess(Cancel)
override def onComplete(): Unit = enqueueAndProcess(Complete)
override def onReady(): Unit = enqueueAndProcess(Ready)
override def onMessage(message: ReqT): Unit = enqueueAndProcess(Message(message))
f(headers).map {
case Right((k, v)) ⇒
val context = Context.current.withValue(k, v)
interceptor = Some(Contexts.interceptCall(context, call, headers, next))
processMessages()
case Left(status) ⇒ call.close(status, new Metadata())
}.recover {
case t: Throwable ⇒
log.error(t, "AsyncContextawareInterceptor future failed")
call.close(Status.fromThrowable(t), new Metadata())
}
}
}
object AuthInterceptor {
val BOTID_CONTEXT_KEY: Context.Key[Int] = Context.key[Int]("botId")
val TOKEN_HEADER_KEY: Metadata.Key[String] = Metadata.Key.of[String]("token", Metadata.ASCII_STRING_MARSHALLER)
def authInterceptor(resolver: String ⇒ Future[Option[Int]])(implicit system: ActorSystem): ServerInterceptor =
AsyncContextawareInterceptor { metadata ⇒
import system.dispatcher
(for {
token ← OptionT.fromOption[Future](Option(metadata.get(TOKEN_HEADER_KEY)))
botId ← OptionT(resolver(token))
} yield botId).value.map {
case Some(id) ⇒ Right(BOTID_CONTEXT_KEY → id)
case None ⇒ Left(Status.PERMISSION_DENIED)
}
}
}
这有效(我的意思是,运行没有异常:)),但是当我AuthInterceptor.BOTID_CONTEXT_KEY.get
在我的方法处理程序中这样做时,它会产生null
.
也许,有更好的方法来处理异步的东西?