这里有两种解决这个问题的方法。第一个是有缺陷的,但我还是发布了它,因为它非常简单。System.Reactive包中已经存在Buffer
带参数的运算符,而System.Linq.Async包中存在异步序列和可观察序列之间的转换器。因此,只需将三个已经可用的运算符链接在一起即可:TimeSpan
public static IAsyncEnumerable<IList<TSource>> Buffer<TSource>(
this IAsyncEnumerable<TSource> source, TimeSpan timeSpan, int count)
{
return source.ToObservable().Buffer(timeSpan, count).ToAsyncEnumerable();
}
不幸的是,这种巧妙的方法是有缺陷的,因为从拉模式转变为推模式再回到拉模式会产生副作用。发生的情况是,中间的 observable 序列在订阅时会开始积极地拉取 source IAsyncEnumerable
,而不管结果IAsyncEnumerable
是如何拉取的。因此,不是结果序列的消费者成为枚举的驱动程序,而是枚举以源序列允许的最大速度在后台静默发生,并且产生的消息被缓冲在内部队列中。因此,不仅有可能对消息的处理施加隐藏的延迟,而且内存消耗也有可能飙升至失控。
第二种是动手方法,将Task.Delay
方法用作计时器,以及Task.WhenAny
协调计时器和枚举任务的方法。这种方法的行为类似于基于 Rx 的方法,除了源序列的枚举是由结果序列的消费者驱动的,正如人们所期望的那样。
/// <summary>
/// Splits the elements of a sequence into chunks that are sent out when either
/// they're full, or a given amount of time has elapsed.
/// </summary>
public static IAsyncEnumerable<IList<TSource>> Buffer<TSource>(
this IAsyncEnumerable<TSource> source, TimeSpan timeSpan, int count)
{
if (source == null) throw new ArgumentNullException(nameof(source));
if (timeSpan < TimeSpan.Zero) throw new ArgumentNullException(nameof(timeSpan));
if (count < 1) throw new ArgumentOutOfRangeException(nameof(count));
return Implementation();
async IAsyncEnumerable<IList<TSource>> Implementation(
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
var timerCts = new CancellationTokenSource();
var delayTask = Task.Delay(timeSpan, timerCts.Token);
(ValueTask<bool> ValueTask, Task<bool> Task) moveNext = default;
var enumerator = source.GetAsyncEnumerator(cancellationToken);
try
{
moveNext = (enumerator.MoveNextAsync(), null);
var buffer = new List<TSource>(count);
ExceptionDispatchInfo error = null;
while (true)
{
Task completedTask = null;
if (!moveNext.ValueTask.IsCompleted)
{
// Preserve the ValueTask, if it's not preserved already.
if (moveNext.Task == null)
{
var preserved = moveNext.ValueTask.AsTask();
moveNext = (new ValueTask<bool>(preserved), preserved);
}
completedTask = await Task.WhenAny(moveNext.Task, delayTask)
.ConfigureAwait(false);
}
if (completedTask == delayTask)
{
Debug.Assert(delayTask.IsCompleted);
yield return buffer.ToArray(); // It's OK if the buffer is empty.
buffer.Clear();
delayTask = Task.Delay(timeSpan, timerCts.Token);
}
else
{
Debug.Assert(moveNext.ValueTask.IsCompleted);
// Await a copy, to prevent a second await on finally.
var moveNextCopy = moveNext.ValueTask;
moveNext = default;
bool moved;
try { moved = await moveNextCopy.ConfigureAwait(false); }
catch (Exception ex)
{
error = ExceptionDispatchInfo.Capture(ex); break;
}
if (!moved) break;
buffer.Add(enumerator.Current);
if (buffer.Count == count)
{
timerCts.Cancel(); timerCts.Dispose();
timerCts = new CancellationTokenSource();
yield return buffer.ToArray();
buffer.Clear();
delayTask = Task.Delay(timeSpan, timerCts.Token);
}
try { moveNext = (enumerator.MoveNextAsync(), null); }
catch (Exception ex)
{
error = ExceptionDispatchInfo.Capture(ex); break;
}
}
}
if (buffer.Count > 0) yield return buffer.ToArray();
error?.Throw();
}
finally
{
// The finally runs when an enumerator created by this method is disposed.
timerCts.Cancel(); timerCts.Dispose();
// Prevent fire-and-forget, otherwise the DisposeAsync() might throw.
// Swallow MoveNextAsync errors, but propagate DisposeAsync errors.
try { await moveNext.ValueTask.ConfigureAwait(false); } catch { }
await enumerator.DisposeAsync().ConfigureAwait(false);
}
}
}
已注意避免泄漏即发即弃MoveNextAsync
操作或计时器。
包装器的分配Task
仅在MoveNextAsync
调用返回 non-completed时发生ValueTask<bool>
。
此实现是非破坏性的,这意味着从源序列中消耗的任何元素都不会丢失。如果源序列失败或枚举被取消,任何缓冲的元素都将在错误传播之前发出。