0

每当我们声明流式 API 时使用 GRPC

rpc heartBeat(Empty) returns (stream ServiceStatus){}

我们有用于观察者模式的谷歌简单界面StreamObserver(这是 protobuf 将为我们生成的)

public interface StreamObserver<V> {
  void onNext(V var1);

  void onError(Throwable var1);

  void onCompleted();
}

现在您要做的是将其转换为实际值Observable,然后再将其传递以供进一步使用。

override fun heartBeat(arg: Empty): Observable<ServiceStatus> {
    // we create rx java subject
    val subject = PublishSubject.create<ServiceStatus>()

    // we create grpc observer and delegate all calls to rx java
    val observer = object : StreamObserver<ServiceStatus> {

        override fun onNext(value: ServiceStatus) {
            subject.onNext(value)
        }

        override fun onError(error: Throwable) {
            subject.onError(error)
        }

        override fun onCompleted() {
            subject.onCompleted()
        }
    }

    // we use grpc observer for generated api
    asyncStub.heartBeat(arg, observer)

    // but we pass rx observable (subject) to client code
    return subject
}

现在我是 Kotlin 的新手,但我不知道现有的委托功能有没有办法让 StreamObserver 的主题委托? 在 Kotlin 中编写这段代码是否有更具表现力的方式?

4

1 回答 1

6

我将创建一个通用方法,该方法创建一个StreamObserver,将其传递给它的 lambda 参数并将结果包装在Observable.

inline fun <T> asObservable(
    crossinline body: (StreamObserver<T>) -> Unit): Observable<T> {
  return Observable.create { subscription ->
    val observer = object : StreamObserver<T> {
      override fun onNext(value: T) {
        subscription.onNext(value)
      }

      override fun onError(error: Throwable) {
        subscription.onError(error)
      }

      override fun onCompleted() {
        subscription.onCompleted()
      }
    }
    body(observer)
  }
}

然后您可以通过以下方式实现 RPC 方法。

override fun heartBeat(arg: Empty): Observable<ServiceStatus> =
    asObservable { asyncStub.heartBeat(arg, it) } 
于 2016-04-14T11:26:08.627 回答