0

我正在制作一个输入流率计。它基本上是一种服务,它公开请求流调用并计算每秒可以处理多少条消息。

由于客户端在发送消息时是完全异步的,所以我使用 ClientCallStreamObserver 在流准备好时开始发送消息,以避免内存溢出。

客户端代码如下所示:

public static void main(String[] args) throws Exception {
    ManagedChannel channel =  ManagedChannelBuilder.forAddress("server", 4242).usePlaintext(true).build();
    ServerGrpc.ServerStub asyncStub = ServerGrpc.newStub(channel);


    StreamObserver<MarketDataOuterClass.Trade> inputStream = asyncStub.reportNewTradeStream(new StreamObserver<Empty>() {
        @Override
        public void onNext(Empty empty) {

        }

        @Override
        public void onError(Throwable throwable) {
            logger.info("on error response stream");
        }

        @Override
        public void onCompleted() {
            logger.info("on completed response stream");
        }
    });

    final ClientCallStreamObserver<MarketDataOuterClass.Trade> clientCallObserver = (ClientCallStreamObserver<MarketDataOuterClass.Trade>) inputStream;

    while (!clientCallObserver.isReady()) {
        Thread.sleep(2000);
        logger.info("stream not ready yet");
    }

    counter.setLastTic(System.nanoTime());

    while (true) {
        counter.inc();
        if (counter.getCounter() % 15000 == 0 ) {
            long now = System.nanoTime();
            double rate = (double) NANOSEC_TO_SEC * counter.getCounter() / (now - counter.getLastTic());
            logger.info("rate: " + rate + " msgs per sec");
            counter.clear();
            counter.setLastTic(now);
        }
        inputStream.onNext(createRandomTrade());
    }
}

我对 isReady 的观察循环永无止境。

OBS:我正在使用 kubernetes 集群来服务我的测试,服务器正在接收调用并返回 StreamObserver 实现。

4

1 回答 1

2

isReady should eventually return true, as long as the RPC doesn't error/complete immediately. But the code is not observing flow control properly.

After each call to onNext() to send a request isReady() could begin returning false. Your while (true) loop should instead have the isReady() check at the beginning of each iteration.

Instead of polling, it is better to call serverCallObserver.setOnReadyHandler(yourRunnable) to be notified when the call is ready to send. Note that you should still check isReady() within yourRunnable as there can be spurious/out-of-date notifications.

于 2017-06-20T16:43:05.557 回答