-3

我有一个异步的消息序列(流),有时会多次到达,有时会偶尔到达,我想分批处理它们,每批 10 条消息。我还想对接收消息和处理消息之间的延迟实施上限,因此如果在接收到批处理的第一条消息后经过 5 秒,还应该处理少于 10 条消息的批处理。我发现我可以通过使用System.Interactive.Async包中的Buffer运算符来解决问题的第一部分:

IAsyncEnumerable<Message> source = GetStreamOfMessages();
IAsyncEnumerable<IList<Message>> batches = source.Buffer(10);
await foreach (IList<Message> batch in batches)
{
    // Process batch
}

运营商签名Buffer

public static IAsyncEnumerable<IList<TSource>> Buffer<TSource>(
    this IAsyncEnumerable<TSource> source, int count);

不幸的是,Buffer运算符没有TimeSpan参数过载,所以我不能这么容易地解决问题的第二部分。我必须自己实现一个带有计时器的批处理操作符。我的问题是:如何实现Buffer具有以下签名的运算符的变体?

public static IAsyncEnumerable<IList<TSource>> Buffer<TSource>(
    this IAsyncEnumerable<TSource> source, TimeSpan timeSpan, int count);

timeSpan参数应影响Buffer操作员的行为,如下所示:

  1. timeSpan在发出前一批之后(或最初在调用方法之后)已经过去时,必须发出一个批次Buffer
  2. timeSpan如果在发出前一个批次之后已经过去,并且在此期间没有收到任何消息,则必须发出一个空批次。
  3. 比每个都更频繁地发出批次timeSpan意味着批次已满。在过去之前发出少于count消息的批次timeSpan是不可取的。

如果需要,我可以向我的项目添加外部依赖项,例如System.Interactive.AsyncSystem.Linq.Async包。

PS这个问题的灵感来自最近一个与通道和内存泄漏有关的问题。

4

2 回答 2

0

使用 aChannel来实现所需的功能怎么样?如果使用类似这种扩展方法的方法从队列中读取直到超时到期,是否有任何缺陷?

public static async Task<List<T>> ReadWithTimeoutAsync<T>(this ChannelReader<T> reader, TimeSpan readTOut, CancellationToken cancellationToken)
{
    var timeoutTokenSrc = new CancellationTokenSource();
    timeoutTokenSrc.CancelAfter(readTOut);

    var messages = new List<T>();

    using (CancellationTokenSource linkedCts =
        CancellationTokenSource.CreateLinkedTokenSource(timeoutTokenSrc.Token, cancellationToken))
    {
        try
        {
            await foreach (var item in reader.ReadAllAsync(linkedCts.Token))
            {
                messages.Add(item);
                linkedCts.Token.ThrowIfCancellationRequested();
            }

            Console.WriteLine("All messages read.");
        }
        catch (OperationCanceledException)
        {
            if (timeoutTokenSrc.Token.IsCancellationRequested)
            {
                Console.WriteLine($"Delay ({readTOut.Milliseconds} msec) for reading items from message channel has expired.");
            }
            else if (cancellationToken.IsCancellationRequested)
            {
                Console.WriteLine("Cancelling per user request.");
                cancellationToken.ThrowIfCancellationRequested();
            }
        }
    }
    timeoutTokenSrc.Dispose();

    return messages;
}

将超时与最大值结合起来。批量大小,可以再添加一个令牌源:

public static async Task<List<T>> ReadBatchWithTimeoutAsync<T>(this ChannelReader<T> reader, int maxBatchSize, TimeSpan readTOut, CancellationToken cancellationToken)
{
    var timeoutTokenSrc = new CancellationTokenSource();
    timeoutTokenSrc.CancelAfter(readTOut);
    var maxSizeTokenSrc = new CancellationTokenSource();

    var messages = new List<T>();

    using (CancellationTokenSource linkedCts =
        CancellationTokenSource.CreateLinkedTokenSource(timeoutTokenSrc.Token, maxSizeTokenSrc.Token, cancellationToken))
    {
        try
        {
            await foreach (var item in reader.ReadAllAsync(linkedCts.Token))
            {
                messages.Add(item);
                if (messages.Count >= maxBatchSize)
                {
                    maxSizeTokenSrc.Cancel();
                }
                linkedCts.Token.ThrowIfCancellationRequested();
            }....
于 2021-11-03T10:56:02.333 回答
-1

这里有两种解决这个问题的方法。第一个是有缺陷的,但我还是发布了它,因为它非常简单。System.Reactive包中已经存在Buffer带参数的运算符,而System.Linq.Async包中存在异步序列和可观察序列之间的转换器。因此,只需将三个已经可用的运算符链接在一起即可:TimeSpan

public static IAsyncEnumerable<IList<TSource>> Buffer<TSource>(
    this IAsyncEnumerable<TSource> source, TimeSpan timeSpan, int count)
{
    return source.ToObservable().Buffer(timeSpan, count).ToAsyncEnumerable();
}

不幸的是,这种巧妙的方法是有缺陷的,因为从拉模式转变为推模式再回到拉模式会产生副作用。发生的情况是,中间的 observable 序列在订阅时会开始积极地拉取 source IAsyncEnumerable,而不管结果IAsyncEnumerable是如何拉取的。因此,不是结果序列的消费者成为枚举的驱动程序,而是枚举以源序列允许的最大速度在后台静默发生,并且产生的消息被缓冲在内部队列中。因此,不仅有可能对消息的处理施加隐藏的延迟,而且内存消耗也有可能飙升至失控。

第二种是动手方法,将Task.Delay方法用作计时器,以及Task.WhenAny协调计时器和枚举任务的方法。这种方法的行为类似于基于 Rx 的方法,除了源序列的枚举是由结果序列的消费者驱动的,正如人们所期望的那样。

/// <summary>
/// Splits the elements of a sequence into chunks that are sent out when either
/// they're full, or a given amount of time has elapsed.
/// </summary>
public static IAsyncEnumerable<IList<TSource>> Buffer<TSource>(
    this IAsyncEnumerable<TSource> source, TimeSpan timeSpan, int count)
{
    if (source == null) throw new ArgumentNullException(nameof(source));
    if (timeSpan < TimeSpan.Zero) throw new ArgumentNullException(nameof(timeSpan));
    if (count < 1) throw new ArgumentOutOfRangeException(nameof(count));
    return Implementation();

    async IAsyncEnumerable<IList<TSource>> Implementation(
        [EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        var timerCts = new CancellationTokenSource();
        var delayTask = Task.Delay(timeSpan, timerCts.Token);
        (ValueTask<bool> ValueTask, Task<bool> Task) moveNext = default;
        var enumerator = source.GetAsyncEnumerator(cancellationToken);
        try
        {
            moveNext = (enumerator.MoveNextAsync(), null);
            var buffer = new List<TSource>(count);
            ExceptionDispatchInfo error = null;
            while (true)
            {
                Task completedTask = null;
                if (!moveNext.ValueTask.IsCompleted)
                {
                    // Preserve the ValueTask, if it's not preserved already.
                    if (moveNext.Task == null)
                    {
                        var preserved = moveNext.ValueTask.AsTask();
                        moveNext = (new ValueTask<bool>(preserved), preserved);
                    }
                    completedTask = await Task.WhenAny(moveNext.Task, delayTask)
                        .ConfigureAwait(false);
                }
                if (completedTask == delayTask)
                {
                    Debug.Assert(delayTask.IsCompleted);
                    yield return buffer.ToArray(); // It's OK if the buffer is empty.
                    buffer.Clear();
                    delayTask = Task.Delay(timeSpan, timerCts.Token);
                }
                else
                {
                    Debug.Assert(moveNext.ValueTask.IsCompleted);
                    // Await a copy, to prevent a second await on finally.
                    var moveNextCopy = moveNext.ValueTask;
                    moveNext = default;
                    bool moved;
                    try { moved = await moveNextCopy.ConfigureAwait(false); }
                    catch (Exception ex)
                    {
                        error = ExceptionDispatchInfo.Capture(ex); break;
                    }
                    if (!moved) break;
                    buffer.Add(enumerator.Current);
                    if (buffer.Count == count)
                    {
                        timerCts.Cancel(); timerCts.Dispose();
                        timerCts = new CancellationTokenSource();
                        yield return buffer.ToArray();
                        buffer.Clear();
                        delayTask = Task.Delay(timeSpan, timerCts.Token);
                    }
                    try { moveNext = (enumerator.MoveNextAsync(), null); }
                    catch (Exception ex)
                    {
                        error = ExceptionDispatchInfo.Capture(ex); break;
                    }
                }
            }
            if (buffer.Count > 0) yield return buffer.ToArray();
            error?.Throw();
        }
        finally
        {
            // The finally runs when an enumerator created by this method is disposed.
            timerCts.Cancel(); timerCts.Dispose();
            // Prevent fire-and-forget, otherwise the DisposeAsync() might throw.
            // Swallow MoveNextAsync errors, but propagate DisposeAsync errors.
            try { await moveNext.ValueTask.ConfigureAwait(false); } catch { }
            await enumerator.DisposeAsync().ConfigureAwait(false);
        }
    }
}

已注意避免泄漏即发即弃MoveNextAsync操作或计时器。

包装器的分配Task仅在MoveNextAsync调用返回 non-completed时发生ValueTask<bool>

此实现是非破坏性的,这意味着从源序列中消耗的任何元素都不会丢失。如果源序列失败或枚举被取消,任何缓冲的元素都将在错误传播之前发出。

于 2021-05-24T17:26:04.117 回答