0

我有一个流服务,可以无限期地从服务器流到客户端,直到客户端取消。

在服务器端,我有一个使用来自数据库的数据填充 ehcache 的线程。

Ehcache 提供了缓存事件的回调,即添加项目时,删除项目时等。我只关心在将元素放入缓存时通知客户端,所以当客户端连接到我的 gRPC 服务时,我注册带有缓存的notifyElementPut()回调,它引用了已连接的客户端StreamObserver

public class GrpcAwareCacheEventListener extends CacheEventListenerAdapter {


  private StreamObserver<FooUpdateResponse> responseObserver;

  public GrpcAwareCacheEventListener(
      StreamObserver<FooUpdateResponse> responseObserver) {
    this.responseObserver = responseObserver;
  }


  @Override
  public void notifyElementPut(Ehcache cache, Element element) throws CacheException {

    Foo foo = (Foo) element.getObjectValue();
    if (foo != null) {
      responseObserver.onNext(
          FooResponse.newBuilder().setFoo(foo).build());
    }
  }
}

我的流媒体 foo 服务如下:

    public void streamFooUpdates(Empty request,
              StreamObserver<FooResponse> responseObserver) {

            final CacheEventListener eventListener = new GrpcAwareCacheEventListener(responseObserver);
            fooCache.getCacheEventNotificationService().registerListener(eventListener);
            Context.current().withCancellation().addListener(new CancellationListener() {

              public void cancelled(Context context) {
    log.info("inside context cancelled callback");      
  fooCache.getCacheEventNotificationService().unregisterListener(eventListener);
              }

            }, ForkJoinPool.commonPool());



          }

这一切都很好,只要客户端连接,客户端就会收到所有 foo 更新的通知。

但是,在客户端断开连接或显式取消调用后,我希望服务器的 Context 的取消侦听器会触发,从而取消注册缓存中的回调。

情况并非如此,无论客户端是关闭通道还是显式取消调用。(我希望服务器端取消的上下文会为这两个事件触发)。我想知道我在客户端的取消语义是否不正确,这是我的客户端代码,取自一个测试用例:

Channel channel = ManagedChannelBuilder.forAddress("localhost", 25001)
        .usePlaintext().build();

    FooServiceGrpc.FooService stub = FooServiceGrpc
        .newStub(channel);


    ClientCallStreamObserver<FooResponse> cancellableObserver = new ClientCallStreamObserver<FooResponse>(){
      public void onNext(FooResponse response) {
        log.info("received foo: {}", response.getFoo());
      }

      public void onError(Throwable throwable) {

      }

      public void onCompleted() {

      }

      public boolean isReady() {
        return false;
      }

      public void setOnReadyHandler(Runnable runnable) {

      }

      public void disableAutoInboundFlowControl() {

      }

      public void request(int i) {

      }

      public void setMessageCompression(boolean b) {

      }

      public void cancel(@Nullable String s, @Nullable Throwable throwable) {

      }
    };

    stub.streamFooUpdates(Empty.newBuilder().build(), cancellableObserver);
    Thread.sleep(10000); // sleep 10 seconds while messages are received.
    cancellableObserver.cancel("cancelling from test", null); //explicit cancel
    ((ManagedChannel) chan).shutdown().awaitTermination(5, TimeUnit.SECONDS); //shutdown as well, for good measure.

    Thread.sleep(7000); //channel should be shutdown by now.

  }

我想知道为什么服务器没有触发“上下文取消”回调。

谢谢!

4

1 回答 1

3

您没有正确取消客户呼叫。的StreamObserver第二个参数stub.streamFooUpdates()是你的回调。你不应该为此打电话StreamObserver

有两种方法可以从客户端取消呼叫。

ClientResponseObserver选项 1:将 a作为第二个参数传递, implement beforeStart(),它为您提供 a ClientCallStreamObserver,您可以在其上调用cancel()

选项 2:stub.streamFooUpdates()在 a中运行CancellableContext,然后取消Context以取消调用。请注意,CancellableContext必须始终取消 a,这就是该finally块的用途。

CancellableContext withCancellation = Context.current().withCancellation();
try {
  withCancellation.run(() -> {
      stub.streamFooUpdates(...);
      Thread.sleep(10000);
      withCancellation.cancel(null);
  });
} finally {
  withCancellation.cancel(null);
}
于 2018-09-12T17:53:10.653 回答