2

我已经弄清楚如何创建一元 RPC 拦截器,但我不知道如何制作流式 RPC 拦截器。这是我迄今为止所拥有的:

public override async Task<TResponse> ClientStreamingServerHandler<TRequest, TResponse>(
        IAsyncStreamReader<TRequest> requestStream, ServerCallContext context,
        ClientStreamingServerMethod<TRequest, TResponse> continuation)
    {
        Console.WriteLine("ClientStreaming");
        var response = await base.ClientStreamingServerHandler(requestStream, context, continuation);
        return response;
    }

每次启动客户端流时,此代码都会截断控制台日志,我只是不知道如何控制台记录每条传入的客户端消息。

亲切的问候杰西

4

2 回答 2

2

我最近遇到了类似的情况,以下是我如何使用拦截器解决它。我们需要通过我们的 gRPC API 测量消息吞吐量/消息大小。一元调用非常简单,但遇到了您在流式传输方面遇到的问题。以下是我为拦截流所做的事情(在我的情况下,服务器应该与您的场景类似)。

第一段代码是您已经拥有的拦截器方法(这是服务器流式传输)

    public override AsyncServerStreamingCall<TResponse> AsyncServerStreamingCall<TRequest, TResponse>(TRequest request, 
                                                                                                    ClientInterceptorContext<TRequest, TResponse> context, 
                                                                                                    AsyncServerStreamingCallContinuation<TRequest, TResponse> continuation)
  {
     // Add the outgoing message size to the metrics
     mCollector.Add(mInterceptorName,context.Method.ServiceName, context.Method.Name, NetworkMetricsCollectionService.DIRECTION.OUT, request as IMessage);

     // This call returns the server stream, among other things and the server stream reader
     // is returned for the client to consume.
     var prelimResponse = base.AsyncServerStreamingCall(request, context, continuation);

     // Add the result message size to the metrics
     mCollector.Add(mInterceptorName, context.Method.ServiceName, context.Method.Name, NetworkMetricsCollectionService.DIRECTION.IN, prelimResponse as IMessage);
     // Wrap the response stream object with our implementation that will log the size and then
     // proxy that to the client.

     var response = new AsyncServerStreamingCall<TResponse>(new AsyncStreamReaderWrapper<TResponse>(prelimResponse.ResponseStream,
                                                                                                    mInterceptorName,
                                                                                                    context.Method.ServiceName, 
                                                                                                    context.Method.Name,mCollector), 
                                                                                                    prelimResponse.ResponseHeadersAsync, 
                                                                                                    prelimResponse.GetStatus, 
                                                                                                    prelimResponse.GetTrailers, 
                                                                                                    prelimResponse.Dispose);
     // return the wrapped stream to the client
     return response;
  }

AsyncServerStreamReaderWrapper 实现接收对象、测量、记录其大小,然后将其传递给客户端。这个包装器是必需的,因为 Stream 阅读器只能有一个使用者,如果我有多个阅读器会产生错误,这是有道理的。

   /// <summary>
   /// Wrapper class around the gRPC AsyncStreamReader class that allows retrieval of the object 
   /// before handing off to the client for the purpose of throughput measurements and metrics
   /// collection
   /// </summary>
   /// <typeparam name="T">type of object contained within the stream</typeparam>
   public class AsyncStreamReaderWrapper<T> : IAsyncStreamReader<T>
   {
      private IAsyncStreamReader<T> mInnerImplementation = null;
      private NetworkMetricsCollectionService mCollector = null;
      private string mId = string.Empty;
      private string mService = string.Empty;
      private string mMethod = string.Empty;

      public T Current => mInnerImplementation.Current;

      /// <summary>
      /// Advances the reader to the next element in the sequence, returning the result asynchronously.
      /// </summary>
      /// <param name="cancellationToken">Cancellation token that can be used to cancel the 
      /// operation.</param>
      /// <returns>Task containing the result of the operation: true if the reader was successfully
      /// advanced to the next element; false if the reader has passed the end of the sequence.</returns>
      public async Task<bool> MoveNext(CancellationToken cancellationToken)
      {
         bool result = await mInnerImplementation.MoveNext(cancellationToken);
         if (result)
         {
            mCollector.Add(mId,mService, mMethod, NetworkMetricsCollectionService.DIRECTION.IN, Current as IMessage);
         }
         return result;
      }

      /// <summary>
      /// Parameterized Constructor
      /// </summary>
      /// <param name="aInnerStream">inner stream reader to wrap</param>
      /// <param name="aService">service name for metrics reporting</param>
      /// <param name="aMethod">method name for metrics reporting</param>
      /// <param name="aCollector">metrics collector</param>
      public AsyncStreamReaderWrapper(IAsyncStreamReader<T> aInnerStream, string aId, string aService, string aMethod, NetworkMetricsCollectionService aCollector)
      {
         mInnerImplementation = aInnerStream;
         mId = aId;
         mService = aService;
         mMethod = aMethod;
         mCollector = aCollector;
      }
   }

我知道这不是您正在寻找的确切场景,但我相信您的实现将是相似的,但使用 StreamWriter 而不是 StreamReader 并且没有用于尝试测量消息大小的位。

于 2020-06-15T12:30:00.963 回答
0

您还可以查看 GRPC 示例 - https://github.com/grpc/grpc-dotnet/tree/master/examples

Github中有很多gRPC场景

于 2022-02-06T09:09:34.693 回答