9

问题标题可能信息量不大,因为我正在尝试实现多种功能。我想根据他发送的标头授权调用者并将此信息传播到 gRPC 方法处理程序。问题在于授权过程的异步性质。我最终得到了这个:

case class AsyncContextawareInterceptor[A](
    f: Metadata ⇒ Future[Either[Status, (Context.Key[A], A)]]
)(implicit val system: ActorSystem)
    extends ServerInterceptor
    with AnyLogging {
  import system.dispatcher

  sealed trait Msg
  case object HalfClose extends Msg
  case object Cancel extends Msg
  case object Complete extends Msg
  case object Ready extends Msg
  case class Message[T](msg: T) extends Msg

  override def interceptCall[ReqT, RespT](call: ServerCall[ReqT, RespT],
                                          headers: Metadata,
                                          next: ServerCallHandler[ReqT, RespT]): ServerCall.Listener[ReqT] =
    new ServerCall.Listener[ReqT] {
      private val stash = new java.util.concurrent.ConcurrentLinkedQueue[Msg]()
      private var interceptor: Option[ServerCall.Listener[ReqT]] = None

      private def enqueueAndProcess(msg: Msg) =
        if (interceptor.isDefined) processMessage(msg) else stash.add(msg)

      private def processMessage(msg: Msg) = msg match {
        case HalfClose ⇒ interceptor.foreach(_.onHalfClose)
        case Cancel ⇒ interceptor.foreach(_.onCancel)
        case Complete ⇒ interceptor.foreach(_.onComplete)
        case Ready ⇒ interceptor.foreach(_.onReady)
        case Message(msg: ReqT @unchecked) ⇒ interceptor.foreach(_.onMessage(msg))
      }

      private def processMessages() = while (!stash.isEmpty) {
        Option(stash.poll).foreach(processMessage)
      }

      override def onHalfClose(): Unit = enqueueAndProcess(HalfClose)

      override def onCancel(): Unit = enqueueAndProcess(Cancel)

      override def onComplete(): Unit = enqueueAndProcess(Complete)

      override def onReady(): Unit = enqueueAndProcess(Ready)

      override def onMessage(message: ReqT): Unit = enqueueAndProcess(Message(message))

      f(headers).map {
        case Right((k, v)) ⇒
          val context = Context.current.withValue(k, v)
          interceptor = Some(Contexts.interceptCall(context, call, headers, next))
          processMessages()
        case Left(status) ⇒ call.close(status, new Metadata())
      }.recover {
        case t: Throwable ⇒
          log.error(t, "AsyncContextawareInterceptor future failed")
          call.close(Status.fromThrowable(t), new Metadata())
      }
    }
}

object AuthInterceptor {
  val BOTID_CONTEXT_KEY: Context.Key[Int] = Context.key[Int]("botId")
  val TOKEN_HEADER_KEY: Metadata.Key[String] = Metadata.Key.of[String]("token", Metadata.ASCII_STRING_MARSHALLER)

  def authInterceptor(resolver: String ⇒ Future[Option[Int]])(implicit system: ActorSystem): ServerInterceptor =
    AsyncContextawareInterceptor { metadata ⇒
      import system.dispatcher
      (for {
        token ← OptionT.fromOption[Future](Option(metadata.get(TOKEN_HEADER_KEY)))
        botId ← OptionT(resolver(token))
      } yield botId).value.map {
        case Some(id) ⇒ Right(BOTID_CONTEXT_KEY → id)
        case None ⇒ Left(Status.PERMISSION_DENIED)
      }
    }
}

这有效(我的意思是,运行没有异常:)),但是当我AuthInterceptor.BOTID_CONTEXT_KEY.get在我的方法处理程序中这样做时,它会产生null.

也许,有更好的方法来处理异步的东西?

4

1 回答 1

1

虽然整个 grpc 上下文传播依赖于 ThreadLocal 存储,但由于它具有线程感知特性,它在 java 中运行良好,但它在 scala 中中断,您没有明确知道在非阻塞存根中实际执行客户端拦截器的线程。

为了解决这个问题,我将 Context 存储在我传递给存根创建的 CallOption 中:

MyServiceGrpc.stub(channel).withOption(<CallOption.Key>, context)

然后在客户端拦截器本身中,我从 callOptions 中获取了 Context :

val context:Context = callOptions.getOption(<CallOption.Key>)

从那里可以在标头上设置上下文值,以便可以从 ServerInterceptors 访问它们

这显然不是最优雅的,但它解决了问题并且有效

于 2018-07-20T15:46:58.350 回答