0

我想将 IAsyncEnumerable 用作 akka 流的 Source。但是我没找到,怎么办。

此代码的 Source 类中没有合适的方法。

using System.Collections.Generic;
using System.Threading.Tasks;
using Akka.Streams.Dsl;

namespace ConsoleApp1
{
    class Program
    {
        static async Task Main(string[] args)
        { 
            Source.From(await AsyncEnumerable())
                .Via(/*some action*/)
                //.....
        }

        private static async IAsyncEnumerable<int> AsyncEnumerable()
        {
            //some async enumerable
        }
    }
}

如何将 IAsyncEnumerbale 用于 Source?

4

3 回答 3

3

Akka.NET v1.4.30 开始,现在通过以下RunAsAsyncEnumerable方法在 Akka.Streams 中原生支持:

var input = Enumerable.Range(1, 6).ToList();

var cts = new CancellationTokenSource();
var token = cts.Token;

var asyncEnumerable = Source.From(input).RunAsAsyncEnumerable(Materializer);
var output = input.ToArray();
bool caught = false;
try
{
    await foreach (var a in asyncEnumerable.WithCancellation(token))
    {
        cts.Cancel();
    }
}
catch (OperationCanceledException e)
{
    caught = true;
}

caught.ShouldBeTrue();

我从 Akka.NET 测试套件中复制了该示例,以防您想知道。

于 2021-12-27T15:04:51.780 回答
2

您还可以使用现有的原语来流式传输大量数据。这是一个使用 Source.unfoldAsync 流式传输数据页面的示例 - 在本例中是使用 Octokit 的 github 存储库 - 直到没有更多数据。

var source = Source.UnfoldAsync<int, RepositoryPage>(startPage, page =>
{
    var pageTask = client.GetRepositoriesAsync(page, pageSize);    
    var next = pageTask.ContinueWith(task =>
    {
        var page = task.Result;
        if (page.PageNumber * pageSize > page.Total) return Option<(int, RepositoryPage)>.None;
        else return new Option<(int, RepositoryPage)>((page.PageNumber + 1, page));
    });

    return next;
}); 

跑步

using var sys = ActorSystem.Create("system");
using var mat = sys.Materializer();

int startPage = 1;
int pageSize = 50;

var client = new GitHubClient(new ProductHeaderValue("github-search-app"));

var source = ...

var sink = Sink.ForEach<RepositoryPage>(Console.WriteLine);

var result = source.RunWith(sink, mat);
await result.ContinueWith(_ => sys.Terminate());
class Page<T>
{
    public Page(IReadOnlyList<T> contents, int page, long total)
    {        
        Contents = contents;
        PageNumber = page;
        Total = total;
    }
    
    public IReadOnlyList<T> Contents { get; set; } = new List<T>();
    public int PageNumber { get; set; }
    public long Total { get; set; }
}

class RepositoryPage : Page<Repository>
{
    public RepositoryPage(IReadOnlyList<Repository> contents, int page, long total) 
        : base(contents, page, total)
    {
    }

    public override string ToString() => 
        $"Page {PageNumber}\n{string.Join("", Contents.Select(x => x.Name + "\n"))}";
}

static class GitHubClientExtensions
{
    public static async Task<RepositoryPage> GetRepositoriesAsync(this GitHubClient client, int page, int size)
    {
        // specify a search term here
        var request = new SearchRepositoriesRequest("bootstrap")
        {
            Page = page,
            PerPage = size
        };

        var result = await client.Search.SearchRepo(request);
        return new RepositoryPage(result.Items, page, result.TotalCount);        
    }
}
于 2022-01-11T07:47:12.310 回答
2

过去这是作为 Akka.NET Streams contrib 包的一部分完成的,但由于我不再看到它,让我们来看看如何实现这样的源。主题可能很长,例如:

  1. Akka.NET Streams 实际上是关于图形处理的——我们谈论的是多输入/多输出配置(在 Akka.NET 中它们被称为入口和出口),并支持图形中的循环。
  2. Akka.NET 不是建立在 .NET async/await 之上,甚至不是在 .NET 标准线程池库之上——它们都是可插拔的,这意味着最低的障碍基本上是使用回调和编码 C# 编译器有时会做的事情我们。
  3. Akka.NET 流能够在阶段/操作符之间推送和拉取值。IAsyncEnumerable<T>只能拉数据,IObservable<T>只能推数据,所以我们在这里获得了更多的表现力,但这是有代价的。

用于实现自定义阶段的低级 API 的基础知识可以在文档中找到。

入门样板如下所示:

public static class AsyncEnumerableExtensions {
    // Helper method to change IAsyncEnumerable into Akka.NET Source.
    public static Source<T, NotUsed> AsSource<T>(this IAsyncEnumerable<T> source) => 
        Source.FromGraph(new AsyncEnumerableSource<T>(source));
}

// Source stage is description of a part of the graph that doesn't consume
// any data, only produce it using a single output channel.
public sealed class AsyncEnumerableSource<T> : GraphStage<SourceShape<T>>
{
    private readonly IAsyncEnumerable<T> _enumerable;

    public AsyncEnumerableSource(IAsyncEnumerable<T> enumerable)
    {
        _enumerable = enumerable;
        Outlet = new Outlet<T>("asyncenumerable.out");
        Shape = new SourceShape<T>(Outlet);
    }

    public Outlet<T> Outlet { get; }
    public override SourceShape<T> Shape { get; }

    /// Logic if to a graph stage, what enumerator is to enumerable.
    protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) => 
        new Logic(this);

    sealed class Logic: OutGraphStageLogic
    {
        public override void OnPull()
        {
            // method called whenever a consumer asks for new data
        }

        public override void OnDownstreamFinish() 
        {
            // method called whenever a consumer stage finishes,used for disposals
        }
    }
}

如前所述,我们在这里没有立即使用 async/await:更重要的是,Logic在异步上下文中调用方法是不安全的。为了安全起见,我们需要注册可能从其他线程调用的方法,GetAsyncCallback<T>并通过返回的包装器调用它们。这将确保在执行异步代码时不会发生数据竞争。

sealed class Logic : OutGraphStageLogic
{
    private readonly Outlet<T> _outlet;
    // enumerator we'll call for MoveNextAsync, and eventually dispose
    private readonly IAsyncEnumerator<T> _enumerator;
    // callback called whenever _enumerator.MoveNextAsync completes asynchronously
    private readonly Action<Task<bool>> _onMoveNext;
    // callback called whenever _enumerator.DisposeAsync completes asynchronously
    private readonly Action<Task> _onDisposed;
    // cache used for errors thrown by _enumerator.MoveNextAsync, that
    // should be rethrown after _enumerator.DisposeAsync
    private Exception? _failReason = null;
    
    public Logic(AsyncEnumerableSource<T> source) : base(source.Shape)
    {
        _outlet = source.Outlet;
        _enumerator = source._enumerable.GetAsyncEnumerator();
        _onMoveNext = GetAsyncCallback<Task<bool>>(OnMoveNext);
        _onDisposed = GetAsyncCallback<Task>(OnDisposed);
    }

    // ... other methods
}

最后一部分是在 `Logic: 上重写的方法:

  • OnPull每当下游阶段需要新数据时使用。这里我们需要调用异步枚举器序列的下一个元素。
  • OnDownstreamFinish每当下游阶段完成并且不会要求任何新数据时调用。这是我们处置我们的枚举器的地方。

问题是这些方法不是 async/await,而它们的枚举器等价物是。我们基本上需要做的是:

  1. 调用底层枚举器(OnPullMoveNextAsyncOnDownstreamFinishDisposeAsync)的相应异步方法。
  2. 看,如果我们可以立即获得他们的结果 - 这是通常作为 C# 编译器的一部分在 async/await 调用中为我们完成的重要部分。
  3. 如果不是,我们需要等待结果 - 调用ContinueWith来注册我们的回调包装器,以便在异步方法完成后调用。
sealed class Logic : OutGraphStageLogic
{
    // ... constructor and fields

    public override void OnPull()
    {
        var hasNext = _enumerator.MoveNextAsync();
        if (hasNext.IsCompletedSuccessfully)
        {
            // first try short-path: values is returned immediately
            if (hasNext.Result)
                // check if there was next value and push it downstream
                Push(_outlet, _enumerator.Current);
            else
                // if there was none, we reached end of async enumerable
                // and we can dispose it
                DisposeAndComplete();
        }
        else
            // we need to wait for the result
            hasNext.AsTask().ContinueWith(_onMoveNext);
    }

    // This method is called when another stage downstream has been completed
    public override void OnDownstreamFinish() =>
        // dispose enumerator on downstream finish
        DisposeAndComplete();
    
    private void DisposeAndComplete()
    {
        var disposed = _enumerator.DisposeAsync();
        if (disposed.IsCompletedSuccessfully)
        {
            // enumerator disposal completed immediately
            if (_failReason is not null)
                // if we close this stream in result of error in MoveNextAsync,
                // fail the stage
                FailStage(_failReason);
            else
                // we can close the stage with no issues
                CompleteStage();
        }
        else 
            // we need to await for enumerator to be disposed
            disposed.AsTask().ContinueWith(_onDisposed);
    }

    private void OnMoveNext(Task<bool> task)
    {
        // since this is callback, it will always be completed, we just need
        // to check for exceptions
        if (task.IsCompletedSuccessfully)
        {
            if (task.Result)
                // if task returns true, it means we read a value
                Push(_outlet, _enumerator.Current);
            else
                // otherwise there are no more values to read and we can close the source
                DisposeAndComplete();
        }
        else
        {
            // task either failed or has been cancelled
            _failReason = task.Exception as Exception ?? new TaskCanceledException(task);
            FailStage(_failReason);
        }
    }

    private void OnDisposed(Task task)
    {
        if (task.IsCompletedSuccessfully) CompleteStage();
        else {
            var reason = task.Exception as Exception 
                ?? _failReason 
                ?? new TaskCanceledException(task);
            FailStage(reason);
        }
    }
}

于 2021-09-27T10:19:37.460 回答