每当我们声明流式 API 时使用 GRPC
rpc heartBeat(Empty) returns (stream ServiceStatus){}
我们有用于观察者模式的谷歌简单界面StreamObserver
(这是 protobuf 将为我们生成的)
public interface StreamObserver<V> {
void onNext(V var1);
void onError(Throwable var1);
void onCompleted();
}
现在您要做的是将其转换为实际值Observable
,然后再将其传递以供进一步使用。
override fun heartBeat(arg: Empty): Observable<ServiceStatus> {
// we create rx java subject
val subject = PublishSubject.create<ServiceStatus>()
// we create grpc observer and delegate all calls to rx java
val observer = object : StreamObserver<ServiceStatus> {
override fun onNext(value: ServiceStatus) {
subject.onNext(value)
}
override fun onError(error: Throwable) {
subject.onError(error)
}
override fun onCompleted() {
subject.onCompleted()
}
}
// we use grpc observer for generated api
asyncStub.heartBeat(arg, observer)
// but we pass rx observable (subject) to client code
return subject
}
现在我是 Kotlin 的新手,但我不知道现有的委托功能有没有办法让 StreamObserver 的主题委托? 在 Kotlin 中编写这段代码是否有更具表现力的方式?