2

我有一个工作流程,我尝试执行以下操作:

  • 一个接受回调的方法,它在内部产生 aStream并且该方法的调用者可以使用回调来处理Stream他们想要的任何方式
  • IAsyncEnumerable在一种特殊情况下,调用者使用回调从 Stream中生成一个。

我在下面创建了一个最小的复制示例:

class Program
{
    private static async Task<Stream> GetStream()
    {
        var text =
            @"Multi-line
            string";

        await Task.Yield();

        var bytes = Encoding.UTF8.GetBytes(text);
        return new MemoryStream(bytes);
    }

    private static async Task<T> StreamData<T>(Func<Stream, T> streamAction)
    {
        await using var stream = await GetStream();
        return streamAction(stream);
    }

    private static async Task StreamData(Func<Stream, Task> streamAction)
    {
        await using var stream = await GetStream();
        await streamAction(stream);
    }

    private static async IAsyncEnumerable<string> GetTextLinesFromStream(Stream stream)
    {
        using var reader = new StreamReader(stream);

        var line = await reader.ReadLineAsync();
        while (line != null)
        {
            yield return line;
            line = await reader.ReadLineAsync();
        }
    }

    private static async Task Test1()
    {
        async Task GetRecords(Stream str)
        {
            await foreach(var line in GetTextLinesFromStream(str))
                Console.WriteLine(line);
        }

        await StreamData(GetRecords);
    }

    private static async Task Test2()
    {
        await foreach(var line in await StreamData(GetTextLinesFromStream))
            Console.WriteLine(line);
    }

    static async Task Main(string[] args)
    {
        await Test1();
        await Test2();
    }
}  

在这里,方法Test1工作正常,而Test2没有,失败Stream is not readable。问题在于,在第二种情况下,当代码开始处理实际流时,流已经被释放了。

大概这两个示例之间的区别在于,对于第一个示例,读取流是在仍在一次性的上下文中执行的stream,而在第二个示例中,我们已经退出了。

但是,我认为第二种情况也可能是有效的——至少我觉得它非常符合 C# 习惯。为了让第二个案例也能正常工作,我还有什么遗漏吗?

4

1 回答 1

2

Test2方法的问题在于,在Stream创建IAsyncEnumerable<string>时,而不是在其枚举完成时处理。

Test2方法使用第一个StreamData重载,即返回 a 的重载Task<T>。在T这种情况下是一个IAsyncEnumerable<string>. 因此,该StreamData方法返回一个生成异步序列的任务,然后立即释放流(在生成序列之后)。显然,这不是处理流的正确时机。正确的时机是在await foreach循环完成之后。

为了使Test2工作透明,您应该添加StreamData返回 a Task<IAsyncEnumerable<T>>(而不是Taskor Task<T>)的方法的第三个重载。此重载应返回与可支配资源绑定的专用异步序列,并在其枚举完成时释放此资源。下面是这样一个序列的实现:

public class AsyncEnumerableDisposable<T> : IAsyncEnumerable<T>
{
    private readonly IAsyncEnumerable<T> _source;
    private readonly IAsyncDisposable _disposable;

    public AsyncEnumerableDisposable(IAsyncEnumerable<T> source,
        IAsyncDisposable disposable)
    {
        // Arguments validation omitted
        _source = source;
        _disposable = disposable;
    }

    async IAsyncEnumerator<T> IAsyncEnumerable<T>.GetAsyncEnumerator(
        CancellationToken cancellationToken)
    {
        await using (_disposable.ConfigureAwait(false))
            await foreach (var item in _source
                .WithCancellation(cancellationToken)
                .ConfigureAwait(false)) yield return item;
    }
}

您可以在这样的StreamData方法中使用它:

private static async Task<IAsyncEnumerable<T>> StreamData<T>(
    Func<Stream, IAsyncEnumerable<T>> streamAction)
{
    var stream = await GetStream();
    return new AsyncEnumerableDisposable<T>(streamAction(stream), stream);
}

请记住,通常IAsyncEnumerable<T>可以在其生命周期内多次枚举 an,并且通过将其包装成 anAsyncEnumerableDisposable<T>它本质上简化为单个枚举序列(因为资源将在第一次枚举后被释放)。


替代方案: System.Interactive.Async包包含运算AsyncEnumerableEx.Using符,可用于代替自定义AsyncEnumerableDisposable类:

private static async Task<IAsyncEnumerable<T>> StreamData<T>(
    Func<Stream, IAsyncEnumerable<T>> streamAction)
{
    var stream = await GetStream();
    return AsyncEnumerableEx.Using(() => stream, streamAction);
}

不同之处在于,Stream将通过其Dispose方法同步处置。AFAICS 不支持IAsyncDisposable在此包中处理 s。

这是该AsyncEnumerableEx.Using方法的签名:

// Constructs an async-enumerable sequence that depends on a resource object, whose
// lifetime is tied to the resulting async-enumerable sequence's lifetime.
public static IAsyncEnumerable<TSource> Using<TSource, TResource>(
    Func<TResource> resourceFactory,
    Func<TResource, IAsyncEnumerable<TSource>> enumerableFactory)
    where TResource : IDisposable;
于 2021-03-11T11:39:42.013 回答