31

我有一个主要受 IO 限制的连续任务(后台拼写检查器与拼写检查服务器交谈)。有时,此任务需要暂停并稍后恢复,具体取决于用户活动。

虽然暂停/恢复本质上是什么async/await,但我发现很少有关于如何为异步方法实现实际暂停/播放逻辑的信息。有推荐的模式吗?

我也考虑过AsyncManualResetEvent为此使用 Stephen Toub,但认为这可能是矫枉过正。

4

5 回答 5

11

为 2019 年更新,我最近有机会重新访问此代码,下面是作为控制台应用程序的完整示例(警告:PauseTokenSource需要良好的单元测试)。

请注意,在我的情况下,要求是当消费者端代码(请求暂停)将继续时,生产者端代码应该已经达到暂停状态。因此,当 UI 准备好反映暂停状态时,预计所有后台活动都已暂停。

using System;
using System.Threading.Tasks;
using System.Threading;

namespace Console_19613444
{
    class Program
    {
        // PauseTokenSource
        public class PauseTokenSource
        {
            bool _paused = false;
            bool _pauseRequested = false;

            TaskCompletionSource<bool> _resumeRequestTcs;
            TaskCompletionSource<bool> _pauseConfirmationTcs;

            readonly SemaphoreSlim _stateAsyncLock = new SemaphoreSlim(1);
            readonly SemaphoreSlim _pauseRequestAsyncLock = new SemaphoreSlim(1);

            public PauseToken Token { get { return new PauseToken(this); } }

            public async Task<bool> IsPaused(CancellationToken token = default(CancellationToken))
            {
                await _stateAsyncLock.WaitAsync(token);
                try
                {
                    return _paused;
                }
                finally
                {
                    _stateAsyncLock.Release();
                }
            }

            public async Task ResumeAsync(CancellationToken token = default(CancellationToken))
            {
                await _stateAsyncLock.WaitAsync(token);
                try
                {
                    if (!_paused)
                    {
                        return;
                    }

                    await _pauseRequestAsyncLock.WaitAsync(token);
                    try
                    {
                        var resumeRequestTcs = _resumeRequestTcs;
                        _paused = false;
                        _pauseRequested = false;
                        _resumeRequestTcs = null;
                        _pauseConfirmationTcs = null;
                        resumeRequestTcs.TrySetResult(true);
                    }
                    finally
                    {
                        _pauseRequestAsyncLock.Release();
                    }
                }
                finally
                {
                    _stateAsyncLock.Release();
                }
            }

            public async Task PauseAsync(CancellationToken token = default(CancellationToken))
            {
                await _stateAsyncLock.WaitAsync(token);
                try
                {
                    if (_paused)
                    {
                        return;
                    }

                    Task pauseConfirmationTask = null;

                    await _pauseRequestAsyncLock.WaitAsync(token);
                    try
                    {
                        _pauseRequested = true;
                        _resumeRequestTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
                        _pauseConfirmationTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
                        pauseConfirmationTask = WaitForPauseConfirmationAsync(token);
                    }
                    finally
                    {
                        _pauseRequestAsyncLock.Release();
                    }

                    await pauseConfirmationTask;

                    _paused = true;
                }
                finally
                {
                    _stateAsyncLock.Release();
                }
            }

            private async Task WaitForResumeRequestAsync(CancellationToken token)
            {
                using (token.Register(() => _resumeRequestTcs.TrySetCanceled(), useSynchronizationContext: false))
                {
                    await _resumeRequestTcs.Task;
                }
            }

            private async Task WaitForPauseConfirmationAsync(CancellationToken token)
            {
                using (token.Register(() => _pauseConfirmationTcs.TrySetCanceled(), useSynchronizationContext: false))
                {
                    await _pauseConfirmationTcs.Task;
                }
            }

            internal async Task PauseIfRequestedAsync(CancellationToken token = default(CancellationToken))
            {
                Task resumeRequestTask = null;

                await _pauseRequestAsyncLock.WaitAsync(token);
                try
                {
                    if (!_pauseRequested)
                    {
                        return;
                    }
                    resumeRequestTask = WaitForResumeRequestAsync(token);
                    _pauseConfirmationTcs.TrySetResult(true);
                }
                finally
                {
                    _pauseRequestAsyncLock.Release();
                }

                await resumeRequestTask;
            }
        }

        // PauseToken - consumer side
        public struct PauseToken
        {
            readonly PauseTokenSource _source;

            public PauseToken(PauseTokenSource source) { _source = source; }

            public Task<bool> IsPaused() { return _source.IsPaused(); }

            public Task PauseIfRequestedAsync(CancellationToken token = default(CancellationToken))
            {
                return _source.PauseIfRequestedAsync(token);
            }
        }

        // Basic usage

        public static async Task DoWorkAsync(PauseToken pause, CancellationToken token)
        {
            try
            {
                while (true)
                {
                    token.ThrowIfCancellationRequested();

                    Console.WriteLine("Before await pause.PauseIfRequestedAsync()");
                    await pause.PauseIfRequestedAsync();
                    Console.WriteLine("After await pause.PauseIfRequestedAsync()");

                    await Task.Delay(1000);
                }
            }
            catch (Exception e)
            {
                Console.WriteLine("Exception: {0}", e);
                throw;
            }
        }

        static async Task Test(CancellationToken token)
        {
            var pts = new PauseTokenSource();
            var task = DoWorkAsync(pts.Token, token);

            while (true)
            {
                token.ThrowIfCancellationRequested();

                Console.WriteLine("Press enter to pause...");
                Console.ReadLine();

                Console.WriteLine("Before pause requested");
                await pts.PauseAsync();
                Console.WriteLine("After pause requested, paused: " + await pts.IsPaused());

                Console.WriteLine("Press enter to resume...");
                Console.ReadLine();

                Console.WriteLine("Before resume");
                await pts.ResumeAsync();
                Console.WriteLine("After resume");
            }
        }

        static async Task Main()
        {
            await Test(CancellationToken.None);
        }
    }
}
于 2014-02-11T20:42:19.320 回答
9

AsyncManualResetEvent考虑到您当前的代码有多混乱,这正是您所需要的。但更好的解决方案是使用 Stephen Toub 的另一种方法:PauseToken. 它的工作原理与 类似AsyncManualResetEvent,只是它的接口是专门为此目的而设计的。

于 2013-10-27T10:12:32.293 回答
5

这对我有用

        using System;

        using System.Threading;
        using System.Threading.Tasks;

        namespace TaskTest2
        {

            class Program
            {
                static ManualResetEvent mre = new ManualResetEvent(false);
                static void Main(string[] args)
                {

                   mre.Set();
                   Task.Factory.StartNew(() =>
                    {
                        while (true)
                        {
                            Console.WriteLine("________________");
                            mre.WaitOne();
                        }

                    } );

                    Thread.Sleep(10000);
                    mre.Reset();
                    Console.WriteLine("Task Paused");
                    Thread.Sleep(10000);
                    Console.WriteLine("Task Will Resume After 1 Second");
                    Thread.Sleep(1000);
                    mre.Set();

                    Thread.Sleep(10000);
                    mre.Reset();
                    Console.WriteLine("Task Paused");


                    Console.Read();
                }
            }
        }
于 2017-01-11T13:50:07.060 回答
4

当涉及到异步/等待编程时,所有其他答案似乎要么很复杂,要么没有标记,因为它持有 CPU 昂贵且可能导致死锁的线程。经过大量的试验、错误和许多死锁,这终于适用于我的高使用率测试。

var isWaiting = true;
while (isWaiting)
{
    try
    {
        //A long delay is key here to prevent the task system from holding the thread.
        //The cancellation token allows the work to resume with a notification 
        //from the CancellationTokenSource.
        await Task.Delay(10000, cancellationToken);
    }
    catch (TaskCanceledException)
    {
        //Catch the cancellation and it turns into continuation
        isWaiting = false;
    }
}
于 2020-02-14T06:47:57.127 回答
2

好吧,也许这值得回答,但我对 C# 不是很熟悉,而且我这里没有 MonoDevelop,现在是凌晨 3 点,所以请见谅。

我建议这样的事情

class Spellchecker
{
  private CancellationTokenSource mustStop = null;
  private volatile Task currentTask = null;

  //TODO add other state variables as needed

  public void StartSpellchecker()
  {
    if (currentTask != null)
    {
      /*
      * A task is already running,
      * you can either throw an exception
      * or silently return
      */
    }

    mustStop = new CancellationTokenSource();
    currentTask = SpellcheckAsync(mustStop.Token);
    currentTask.Start();
  }

  private async Task SpellcheckAsync(CancellationToken ct)
  {
    while (!ct.IsCancellationRequested))
    {
      /*
      * TODO perform spell check
      * This method must be the only one accessing
      * the spellcheck-related state variables
      */
    }
    currentTask = null;
  }

  public async Task StopSpellchecker()
  {
    if (currentTask == null)
    {
      /*
      * There is no task running
      * you can either throw an exception
      * or silently return
      */
    }
    else
    {
      /*
      * A CancelAfter(TimeSpan) method
      * is also available, which might interest you
      */
      mustStop.Cancel();

      //Remove the following lines if you don't want to wait for the task to actually stop
      var task = currentTask;
      if (task != null)
      {
        await task;
      }
    }
  }
}
于 2013-10-27T02:08:35.933 回答