我最近遇到了类似的情况,以下是我如何使用拦截器解决它。我们需要通过我们的 gRPC API 测量消息吞吐量/消息大小。一元调用非常简单,但遇到了您在流式传输方面遇到的问题。以下是我为拦截流所做的事情(在我的情况下,服务器应该与您的场景类似)。
第一段代码是您已经拥有的拦截器方法(这是服务器流式传输)
public override AsyncServerStreamingCall<TResponse> AsyncServerStreamingCall<TRequest, TResponse>(TRequest request,
ClientInterceptorContext<TRequest, TResponse> context,
AsyncServerStreamingCallContinuation<TRequest, TResponse> continuation)
{
// Add the outgoing message size to the metrics
mCollector.Add(mInterceptorName,context.Method.ServiceName, context.Method.Name, NetworkMetricsCollectionService.DIRECTION.OUT, request as IMessage);
// This call returns the server stream, among other things and the server stream reader
// is returned for the client to consume.
var prelimResponse = base.AsyncServerStreamingCall(request, context, continuation);
// Add the result message size to the metrics
mCollector.Add(mInterceptorName, context.Method.ServiceName, context.Method.Name, NetworkMetricsCollectionService.DIRECTION.IN, prelimResponse as IMessage);
// Wrap the response stream object with our implementation that will log the size and then
// proxy that to the client.
var response = new AsyncServerStreamingCall<TResponse>(new AsyncStreamReaderWrapper<TResponse>(prelimResponse.ResponseStream,
mInterceptorName,
context.Method.ServiceName,
context.Method.Name,mCollector),
prelimResponse.ResponseHeadersAsync,
prelimResponse.GetStatus,
prelimResponse.GetTrailers,
prelimResponse.Dispose);
// return the wrapped stream to the client
return response;
}
AsyncServerStreamReaderWrapper 实现接收对象、测量、记录其大小,然后将其传递给客户端。这个包装器是必需的,因为 Stream 阅读器只能有一个使用者,如果我有多个阅读器会产生错误,这是有道理的。
/// <summary>
/// Wrapper class around the gRPC AsyncStreamReader class that allows retrieval of the object
/// before handing off to the client for the purpose of throughput measurements and metrics
/// collection
/// </summary>
/// <typeparam name="T">type of object contained within the stream</typeparam>
public class AsyncStreamReaderWrapper<T> : IAsyncStreamReader<T>
{
private IAsyncStreamReader<T> mInnerImplementation = null;
private NetworkMetricsCollectionService mCollector = null;
private string mId = string.Empty;
private string mService = string.Empty;
private string mMethod = string.Empty;
public T Current => mInnerImplementation.Current;
/// <summary>
/// Advances the reader to the next element in the sequence, returning the result asynchronously.
/// </summary>
/// <param name="cancellationToken">Cancellation token that can be used to cancel the
/// operation.</param>
/// <returns>Task containing the result of the operation: true if the reader was successfully
/// advanced to the next element; false if the reader has passed the end of the sequence.</returns>
public async Task<bool> MoveNext(CancellationToken cancellationToken)
{
bool result = await mInnerImplementation.MoveNext(cancellationToken);
if (result)
{
mCollector.Add(mId,mService, mMethod, NetworkMetricsCollectionService.DIRECTION.IN, Current as IMessage);
}
return result;
}
/// <summary>
/// Parameterized Constructor
/// </summary>
/// <param name="aInnerStream">inner stream reader to wrap</param>
/// <param name="aService">service name for metrics reporting</param>
/// <param name="aMethod">method name for metrics reporting</param>
/// <param name="aCollector">metrics collector</param>
public AsyncStreamReaderWrapper(IAsyncStreamReader<T> aInnerStream, string aId, string aService, string aMethod, NetworkMetricsCollectionService aCollector)
{
mInnerImplementation = aInnerStream;
mId = aId;
mService = aService;
mMethod = aMethod;
mCollector = aCollector;
}
}
我知道这不是您正在寻找的确切场景,但我相信您的实现将是相似的,但使用 StreamWriter 而不是 StreamReader 并且没有用于尝试测量消息大小的位。