0

我正在尝试为集合生成一个稳定的 IAsyncEnumerator。我正在使用 C# 7.3,我的目标框架是 .Net Standard 2.1 和 .Net Standard 2.0。

枚举器只有在我调试时才稳定。我基本上抓取了一个长度为 4 的对象列表。Promise 在列表中的最后一项附近或在我不太了解的情况下变得无效。我正在尝试扩展我的工具包,但是异步编程需要一些工作。我在网上四处寻找稳定迭代器的文章和示例。但一个都找不到。事实上,我发现的唯一一件事是 C# 8 将为您构建它。我完全不同意。人们应该了解他们选择使用的工具。

我根据我能找到的内容生成了以下代码。

单元测试

[Test]
public void ShouldBeAbleToLoadResultSet()
{
    FluentActions.Invoking(async () =>
    {
        var cts = new CancellationTokenSource();

        var people = Context.People
        .AsAsyncSubSonicQueryable()
        .LoadAsync(cts.Token);

        int cnt = 0;

        await foreach(Person person in people.Result
            .WithCancellation(cts.Token)
            .ConfigureAwait(true))
        {
            person.FullName.Should().Be(String.Format("{0}, {1}{2}",
                person.FamilyName, person.FirstName,
                string.IsNullOrEmpty(person.MiddleInitial?.Trim()) ? "" : $" {person.MiddleInitial}."));

            cnt++;
        }

        cnt.Should().Be(Context.People.Count);

    }).Should().NotThrow();

}

处理 IAsyncEnumerator 的部分类文件

using System;
using System.Collections.Generic;
using System.Diagnostics;
#if NETSTANDARD2_1
using System.Diagnostics.CodeAnalysis;
#endif
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Sources;

namespace SubSonic.Infrastructure
{
    public sealed partial class SubSonicCollection<TElement>
        : IAsyncEnumerator<TElement>
        , IAsyncStateMachine
        , IValueTaskSource<bool>
        , IValueTaskSource
        , IDisposable
    {
        private static readonly Action<object> CallbackCompleted = _ => Debug.Assert(false, "should not be invoked!");
        private Action<Object> continuation;
        private AsyncIteratorMethodBuilder asyncMethodBuilder;
        private CancellationTokenSource CancellationTokenSource;
        public CancellationToken CancellationToken => CancellationTokenSource?.Token ?? default;
        private ManualResetValueTaskSourceCore<bool> PromiseOfValueOrEnd;
        private TaskAwaiter StateAwaiter;
        private IAsyncStateMachine sm;

        private ExecutionContext executionContext;
        private object scheduler;
        private object state;

        private short token;
        private bool? result;
        private int index;
        private int? istate;
        private bool disposed;

        public TElement Current { get; private set; }

        private TElement GetElementAt(int idx)
        {
            if (TableData is IEnumerable<TElement> data)
            {
                return data.ElementAt(idx);
            }

            return default(TElement);
        }

        IAsyncEnumerator<TElement> IAsyncEnumerable<TElement>.GetAsyncEnumerator(CancellationToken cancellationToken)
        {
            asyncMethodBuilder = new AsyncIteratorMethodBuilder();
            CancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);

            disposed = false;

            return this;
        }

        public async ValueTask DisposeAsync()
        {
            await Task.CompletedTask.ConfigureAwait(false);

            Dispose();
        }

        public void MoveNext()
        {
            try
            {
                TaskAwaiter awaiter;

                if (disposed)
                {
                    goto DONE_ITERATING;
                }

                switch(istate.GetValueOrDefault(-1))
                {
                    case 0:
                        awaiter = StateAwaiter;
                        goto DONE_AWAIT;
                    case -4:
                        index++;
                        goto LOOP_CONDITION;
                    default:
                        index = 0;
                        goto LOOP_CONDITION;
                }
            LOOP_CONDITION:
                if (index >= Count)
                {
                    goto DONE_ITERATING;
                }
                awaiter = Task.Delay(index, CancellationToken).GetAwaiter();
                if(!awaiter.IsCompleted)
                {
                    istate = 0;
                    StateAwaiter = awaiter;
                    SetStateMachine(this);
                    asyncMethodBuilder.AwaitUnsafeOnCompleted(ref awaiter, ref sm);
                    return;
                }
            DONE_AWAIT:
                awaiter.GetResult();
                Current = GetElementAt(index);
                istate = -4;
                goto RETURN_TRUE_FROM_MOVENEXTASYNC;
            DONE_ITERATING:
                istate = -2;
                CancellationTokenSource?.Dispose();
                PromiseOfValueOrEnd.SetResult(result: false);
                return;
            RETURN_TRUE_FROM_MOVENEXTASYNC:
                PromiseOfValueOrEnd.SetResult(result: true);
            }
            catch(OperationCanceledException ex)
            {
                AbortExecution(ex);
            }
            catch(InvalidOperationException ex)
            {
                AbortExecution(ex);
            }

        }

        private void AbortExecution(Exception ex)
        {
            istate = -2;
            CancellationTokenSource?.Dispose();
            PromiseOfValueOrEnd.SetException(ex);
        }

        public ValueTask<bool> MoveNextAsync()
        {
            if (istate == -2)
            {
                return default;
            }

            ResetAndReleaseOperation();

            SetStateMachine(this);

            asyncMethodBuilder.MoveNext(ref sm);

            if (GetStatus(this.token) == ValueTaskSourceStatus.Succeeded)
            {
                GetResult(this.token);

                Debug.Assert(result.HasValue, "failed to complete");

                return new ValueTask<bool>(result.GetValueOrDefault());
            }
            else
            {
                return new ValueTask<bool>(this, token);
            }
        }

        public void OnCompleted(Action<object> continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags)
        {
            if (this.token != token)
            {
                throw Error.InvalidOperation(SubSonicErrorMessages.MultipleContinuations);
            }

            if ((flags & ValueTaskSourceOnCompletedFlags.FlowExecutionContext) != 0)
            {
                this.executionContext = ExecutionContext.Capture();
            }

            if ((flags & ValueTaskSourceOnCompletedFlags.UseSchedulingContext) != 0)
            {
                SynchronizationContext sc = SynchronizationContext.Current;

                if (sc != null && sc.GetType() == typeof(SynchronizationContext))
                {
                    this.scheduler = sc;
                }
                else
                {
                    TaskScheduler ts = TaskScheduler.Current;
                    if (ts != TaskScheduler.Default)
                    {
                        this.scheduler = ts;
                    }
                }
            }

            this.state = state;

            var previousContinuation = Interlocked.CompareExchange(ref this.continuation, continuation, null);

            if (previousContinuation != null)
            {
                if (!ReferenceEquals(previousContinuation, CallbackCompleted))
                {
                    throw Error.InvalidOperation(SubSonicErrorMessages.ErrorPreviousContinuation);
                }

                this.executionContext = null;
                this.state = null;
            }

            InvokeContinuation(continuation, state);
        }

        private void InvokeContinuation(Action<object> continuation, object state)
        {
            if (continuation is null)
            {
                return;
            }

            object scheduler = this.scheduler ?? TaskScheduler.Default;

            if (scheduler != null)
            {
                if (scheduler is SynchronizationContext sc)
                {
                    sc.Post(s =>
                    {
                        var t = (Tuple<Action<object>, object>)s;
                        t.Item1(t.Item2);
                    }, Tuple.Create(continuation, state));
                }
                else if (scheduler is TaskScheduler ts)
                {
                    Task.Factory.StartNew(continuation, state, CancellationToken, TaskCreationOptions.DenyChildAttach, ts);
                }
                else
                {
                    throw Error.NotSupported($"{scheduler}");
                }
            }
#if NETSTANDARD2_1
            else if (PromiseOfValueOrEnd.RunContinuationsAsynchronously)
            {
                ThreadPool.QueueUserWorkItem(continuation, state, preferLocal: true);
            }
#endif
            else
            {
                continuation(state);
            }

        }

        public void SetStateMachine(IAsyncStateMachine stateMachine)
        {
            sm = stateMachine;
        }

        public ValueTaskSourceStatus GetStatus(short token)
        {
            if (this.token != token)
            {
                throw Error.InvalidOperation(SubSonicErrorMessages.MultipleContinuations);
            }

            return PromiseOfValueOrEnd.GetStatus(token);
        }

        public void GetResult(short token)
        {
            if (this.token != token)
            {
                throw Error.InvalidOperation(SubSonicErrorMessages.MultipleContinuations);
            }

            this.result = PromiseOfValueOrEnd.GetResult(token);
        }

        bool IValueTaskSource<bool>.GetResult(short token)
        {
            if (this.token != token)
            {
                throw Error.InvalidOperation(SubSonicErrorMessages.MultipleContinuations);
            }

            while (GetStatus(token) == ValueTaskSourceStatus.Pending)
            {
                asyncMethodBuilder.MoveNext(ref sm);
            }

            GetResult(token);

            return result.Value;
        }

        private void ResetAndReleaseOperation()
        {
            CancellationToken.ThrowIfCancellationRequested();

            PromiseOfValueOrEnd.Reset();

            this.token = PromiseOfValueOrEnd.Version;
            this.continuation = null;
            this.scheduler = null;
            this.state = null;
            this.result = null;
        }

        private void Dispose(bool disposing)
        {
            if (!disposed)
            {
                if (disposing)
                {
                    CancellationTokenSource?.Dispose();
                    CancellationTokenSource = null;

                    istate = null;
                }

                // TODO: free unmanaged resources (unmanaged objects) and override finalizer
                // TODO: set large fields to null
                disposed = true;
            }
        }

        // // TODO: override finalizer only if 'Dispose(bool disposing)' has code to free unmanaged resources
        // ~SubSonicCollection()
        // {
        //     // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method
        //     Dispose(disposing: false);
        // }

        public void Dispose()
        {
            // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method
            Dispose(disposing: true);
            GC.SuppressFinalize(this);
        }
    }
}
4

0 回答 0