4

我想知道是否有一种方法可以创建IAsyncEnumerable<T>IAsyncEnumerator<T>通过 Source 对象创建,而不是像TaskCompletionSource允许一个人做任务一样。特别是,TaskCompletionSource可以像任何其他参数一样传递。

也许是这样的:

public class AsyncEnumerables {

    public Task HandlerTask { get; set; }

    public async Task<string> ParentMethod() {
        var source = new AsyncEnumerableSource<int>();
        IAsyncEnumerable asyncEnumerable = source.GetAsyncEnumerable();
        HandlerTask = Task.Run(() => handleAsyncResultsAsTheyHappen(asyncEnumerable));
        int n = await someOtherTask();
        source.YieldReturn(n);
        var r = await ChildMethod(source);
        source.Complete();  // this call would cause the HandlerTask to complete.
        return r;
    }

    private async Task<string> ChildMethod(AsyncEnumerableSource<int> source) {
        source.YieldReturn(5);
        await SomeOtherCall();
        source.YieldReturn(10);
        return "hello";
    }
}

使用上面的代码,handleAsyncResultsAsTheyHappen任务将看到传递给 YieldReturn 的任何值。所以它会看到nfrom 上面的代码,以及 the510from ChildMethod

4

3 回答 3

4

如果您可以构建代码以利用yield returnawait foreach. 例如,这段代码几乎做同样的事情:

public async Task Consume()
{
    var source = ParentMethod();
    HandlerTask = Task.Run(async () => { await foreach (var item in source) { Console.WriteLine(item); } });
}

public async IAsyncEnumerable<int> ParentMethod()
{
    await Task.Yield();
    yield return 13;
    await foreach (var item in ChildMethod())
        yield return item;
}

private async IAsyncEnumerable<int> ChildMethod()
{
    yield return 5;
    await Task.Yield();
    yield return 10;
}

然而,如果你真的需要一个“异步可枚举源”,你首先需要认清一件事。TaskCompletionSource<T>保存结果,即T(或异常)。它充当容器。可以在等待任务之前设置结果。这与“异步可枚举源”相同——您需要它能够在从中获取任何项目之前保存结果。“异步可枚举源”需要保存多个结果——在本例中是一个集合

所以你实际上要求的是“一个可以作为异步枚举使用的集合”。这里有几种可能性,但我推荐的是Channel

public async Task<string> ParentMethod()
{
  var source = Channel.CreateUnbounded<int>();
  var sourceWriter = source.Writer;
  IAsyncEnumerable<int> asyncEnumerable = source.Reader.ReadAllAsync();
  HandlerTask = Task.Run(async () => { await foreach (var item in asyncEnumerable) Console.WriteLine(item); });
  await Task.Yield();
  await sourceWriter.WriteAsync(13);
  var r = await ChildMethod(sourceWriter);
  sourceWriter.Complete();
  return r;
}

private async Task<string> ChildMethod(ChannelWriter<int> sourceWriter)
{
  await sourceWriter.WriteAsync(5);
  await Task.Yield();
  await sourceWriter.WriteAsync(10);
  return "hello";
}
于 2020-05-01T14:15:33.323 回答
2

AFAIK .NET 平台没有内置类,但使用System.ReactiveSystem.Linq.AsyncAsyncEnumerableSource很容易实现。该库包含由 an和组合而成的类。这是一个方便的类,因为您可以向接口发送通知,并独立订阅接口任意次数以接收这些通知。实际上不需要手动订阅,因为该库包含方便的扩展方法,可以自动转换为。System.ReactiveSubjectIObservableIObserverIObserverIObservableSystem.Linq.AsyncToAsyncEnumerableIObservableIAsyncEnumerable

using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Subjects;

public class AsyncEnumerableSource<T>
{
    private readonly Subject<T> _subject = new Subject<T>();

    public IAsyncEnumerable<T> GetAsyncEnumerable() => _subject.ToAsyncEnumerable();
    public void YieldReturn(T value) => _subject.OnNext(value);
    public void Complete() => _subject.OnCompleted();
    public void Fault(Exception ex) => _subject.OnError(ex);
}

此实现将仅向订阅者发送订阅后发生的通知。如果您想确保迟到的加入者会收到早期的消息,您可以将 替换SubjectReplaySubject. 这个缓冲它接收到的通知,因此它带有内存使用注意事项:它int bufferSize在其构造函数中接受一个参数。


注意:上面的实现是线程安全的,虽然Subject<T>不是同步OnNext的,并且通常从多个线程并行调用会破坏Rx 契约。这是因为ToAsyncEnumerable运营商不依赖于 Rx 合约的正确性,并同步传入的通知。虽然这不是一个特别有效的实现。Channel<T>基于 -based 的实现在重负载下效率显着提高。

于 2020-05-01T16:09:03.217 回答
1

这是AsyncEnumerableSource该类的另一个实现,它不依赖于 RX 库。这取决于Channel<T>,ImmutableArrayImmutableInterlocked类,所有这些都在 .NET Core 平台上原生可用(也可用于 .NET Framework 作为包)。它与基于 RX 的实现具有相同的行为。

该类AsyncEnumerableSource可以将通知传播给多个订阅者。每个订阅者都可以按照自己的节奏枚举这些通知。这是可能的,因为每个订阅都有自己的专用Channel<T>底层存储。订阅的生命周期实际上与单个await foreach循环的生命周期相关。出于任何原因(包括抛出的异常)提前从循环中中断,立即结束订阅。

在技​​术术语中,每次调用从方法返回的方法GetAsyncEnumerator时都会创建订阅。单独调用该方法不会创建订阅。当关联对象被释放时订阅结束。IAsyncEnumerableGetAsyncEnumerableGetAsyncEnumerableIAsyncEnumerator

public class AsyncEnumerableSource<T>
{
    private IImmutableList<Channel<T>> _channels
        = ImmutableArray<Channel<T>>.Empty;

    public async IAsyncEnumerable<T> GetAsyncEnumerable(
        [EnumeratorCancellation] CancellationToken token = default)
    {
        var channel = Channel.CreateUnbounded<T>();
        ImmutableInterlocked.Update(ref _channels, x => x.Add(channel));
        try
        {
            while (await channel.Reader.WaitToReadAsync(token).ConfigureAwait(false))
            {
                while (channel.Reader.TryRead(out var item))
                {
                    yield return item;
                    token.ThrowIfCancellationRequested();
                }
            }
        }
        finally
        {
            ImmutableInterlocked.Update(ref _channels, x => x.Remove(channel));
        }
    }

    public void YieldReturn(T value)
    {
        foreach (var channel in Volatile.Read(ref _channels))
            channel.Writer.TryWrite(value);
    }
    public void Complete()
    {
        foreach (var channel in Volatile.Read(ref _channels))
            channel.Writer.TryComplete();
    }
    public void Fault(Exception ex)
    {
        foreach (var channel in Volatile.Read(ref _channels))
            channel.Writer.TryComplete(ex);
    }
}
于 2020-05-30T05:17:34.323 回答