过去这是作为 Akka.NET Streams contrib 包的一部分完成的,但由于我不再看到它,让我们来看看如何实现这样的源。主题可能很长,例如:
- Akka.NET Streams 实际上是关于图形处理的——我们谈论的是多输入/多输出配置(在 Akka.NET 中它们被称为入口和出口),并支持图形中的循环。
- Akka.NET 不是建立在 .NET async/await 之上,甚至不是在 .NET 标准线程池库之上——它们都是可插拔的,这意味着最低的障碍基本上是使用回调和编码 C# 编译器有时会做的事情我们。
- Akka.NET 流能够在阶段/操作符之间推送和拉取值。
IAsyncEnumerable<T>
只能拉数据,IObservable<T>
只能推数据,所以我们在这里获得了更多的表现力,但这是有代价的。
用于实现自定义阶段的低级 API 的基础知识可以在文档中找到。
入门样板如下所示:
public static class AsyncEnumerableExtensions {
// Helper method to change IAsyncEnumerable into Akka.NET Source.
public static Source<T, NotUsed> AsSource<T>(this IAsyncEnumerable<T> source) =>
Source.FromGraph(new AsyncEnumerableSource<T>(source));
}
// Source stage is description of a part of the graph that doesn't consume
// any data, only produce it using a single output channel.
public sealed class AsyncEnumerableSource<T> : GraphStage<SourceShape<T>>
{
private readonly IAsyncEnumerable<T> _enumerable;
public AsyncEnumerableSource(IAsyncEnumerable<T> enumerable)
{
_enumerable = enumerable;
Outlet = new Outlet<T>("asyncenumerable.out");
Shape = new SourceShape<T>(Outlet);
}
public Outlet<T> Outlet { get; }
public override SourceShape<T> Shape { get; }
/// Logic if to a graph stage, what enumerator is to enumerable.
protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) =>
new Logic(this);
sealed class Logic: OutGraphStageLogic
{
public override void OnPull()
{
// method called whenever a consumer asks for new data
}
public override void OnDownstreamFinish()
{
// method called whenever a consumer stage finishes,used for disposals
}
}
}
如前所述,我们在这里没有立即使用 async/await:更重要的是,Logic
在异步上下文中调用方法是不安全的。为了安全起见,我们需要注册可能从其他线程调用的方法,GetAsyncCallback<T>
并通过返回的包装器调用它们。这将确保在执行异步代码时不会发生数据竞争。
sealed class Logic : OutGraphStageLogic
{
private readonly Outlet<T> _outlet;
// enumerator we'll call for MoveNextAsync, and eventually dispose
private readonly IAsyncEnumerator<T> _enumerator;
// callback called whenever _enumerator.MoveNextAsync completes asynchronously
private readonly Action<Task<bool>> _onMoveNext;
// callback called whenever _enumerator.DisposeAsync completes asynchronously
private readonly Action<Task> _onDisposed;
// cache used for errors thrown by _enumerator.MoveNextAsync, that
// should be rethrown after _enumerator.DisposeAsync
private Exception? _failReason = null;
public Logic(AsyncEnumerableSource<T> source) : base(source.Shape)
{
_outlet = source.Outlet;
_enumerator = source._enumerable.GetAsyncEnumerator();
_onMoveNext = GetAsyncCallback<Task<bool>>(OnMoveNext);
_onDisposed = GetAsyncCallback<Task>(OnDisposed);
}
// ... other methods
}
最后一部分是在 `Logic: 上重写的方法:
OnPull
每当下游阶段需要新数据时使用。这里我们需要调用异步枚举器序列的下一个元素。
OnDownstreamFinish
每当下游阶段完成并且不会要求任何新数据时调用。这是我们处置我们的枚举器的地方。
问题是这些方法不是 async/await,而它们的枚举器等价物是。我们基本上需要做的是:
- 调用底层枚举器(
OnPull
→MoveNextAsync
和OnDownstreamFinish
→ DisposeAsync
)的相应异步方法。
- 看,如果我们可以立即获得他们的结果 - 这是通常作为 C# 编译器的一部分在 async/await 调用中为我们完成的重要部分。
- 如果不是,我们需要等待结果 - 调用
ContinueWith
来注册我们的回调包装器,以便在异步方法完成后调用。
sealed class Logic : OutGraphStageLogic
{
// ... constructor and fields
public override void OnPull()
{
var hasNext = _enumerator.MoveNextAsync();
if (hasNext.IsCompletedSuccessfully)
{
// first try short-path: values is returned immediately
if (hasNext.Result)
// check if there was next value and push it downstream
Push(_outlet, _enumerator.Current);
else
// if there was none, we reached end of async enumerable
// and we can dispose it
DisposeAndComplete();
}
else
// we need to wait for the result
hasNext.AsTask().ContinueWith(_onMoveNext);
}
// This method is called when another stage downstream has been completed
public override void OnDownstreamFinish() =>
// dispose enumerator on downstream finish
DisposeAndComplete();
private void DisposeAndComplete()
{
var disposed = _enumerator.DisposeAsync();
if (disposed.IsCompletedSuccessfully)
{
// enumerator disposal completed immediately
if (_failReason is not null)
// if we close this stream in result of error in MoveNextAsync,
// fail the stage
FailStage(_failReason);
else
// we can close the stage with no issues
CompleteStage();
}
else
// we need to await for enumerator to be disposed
disposed.AsTask().ContinueWith(_onDisposed);
}
private void OnMoveNext(Task<bool> task)
{
// since this is callback, it will always be completed, we just need
// to check for exceptions
if (task.IsCompletedSuccessfully)
{
if (task.Result)
// if task returns true, it means we read a value
Push(_outlet, _enumerator.Current);
else
// otherwise there are no more values to read and we can close the source
DisposeAndComplete();
}
else
{
// task either failed or has been cancelled
_failReason = task.Exception as Exception ?? new TaskCanceledException(task);
FailStage(_failReason);
}
}
private void OnDisposed(Task task)
{
if (task.IsCompletedSuccessfully) CompleteStage();
else {
var reason = task.Exception as Exception
?? _failReason
?? new TaskCanceledException(task);
FailStage(reason);
}
}
}