我有一个主要受 IO 限制的连续任务(后台拼写检查器与拼写检查服务器交谈)。有时,此任务需要暂停并稍后恢复,具体取决于用户活动。
虽然暂停/恢复本质上是什么async/await
,但我发现很少有关于如何为异步方法实现实际暂停/播放逻辑的信息。有推荐的模式吗?
我也考虑过AsyncManualResetEvent
为此使用 Stephen Toub,但认为这可能是矫枉过正。
我有一个主要受 IO 限制的连续任务(后台拼写检查器与拼写检查服务器交谈)。有时,此任务需要暂停并稍后恢复,具体取决于用户活动。
虽然暂停/恢复本质上是什么async/await
,但我发现很少有关于如何为异步方法实现实际暂停/播放逻辑的信息。有推荐的模式吗?
我也考虑过AsyncManualResetEvent
为此使用 Stephen Toub,但认为这可能是矫枉过正。
为 2019 年更新,我最近有机会重新访问此代码,下面是作为控制台应用程序的完整示例(警告:PauseTokenSource
需要良好的单元测试)。
请注意,在我的情况下,要求是当消费者端代码(请求暂停)将继续时,生产者端代码应该已经达到暂停状态。因此,当 UI 准备好反映暂停状态时,预计所有后台活动都已暂停。
using System;
using System.Threading.Tasks;
using System.Threading;
namespace Console_19613444
{
class Program
{
// PauseTokenSource
public class PauseTokenSource
{
bool _paused = false;
bool _pauseRequested = false;
TaskCompletionSource<bool> _resumeRequestTcs;
TaskCompletionSource<bool> _pauseConfirmationTcs;
readonly SemaphoreSlim _stateAsyncLock = new SemaphoreSlim(1);
readonly SemaphoreSlim _pauseRequestAsyncLock = new SemaphoreSlim(1);
public PauseToken Token { get { return new PauseToken(this); } }
public async Task<bool> IsPaused(CancellationToken token = default(CancellationToken))
{
await _stateAsyncLock.WaitAsync(token);
try
{
return _paused;
}
finally
{
_stateAsyncLock.Release();
}
}
public async Task ResumeAsync(CancellationToken token = default(CancellationToken))
{
await _stateAsyncLock.WaitAsync(token);
try
{
if (!_paused)
{
return;
}
await _pauseRequestAsyncLock.WaitAsync(token);
try
{
var resumeRequestTcs = _resumeRequestTcs;
_paused = false;
_pauseRequested = false;
_resumeRequestTcs = null;
_pauseConfirmationTcs = null;
resumeRequestTcs.TrySetResult(true);
}
finally
{
_pauseRequestAsyncLock.Release();
}
}
finally
{
_stateAsyncLock.Release();
}
}
public async Task PauseAsync(CancellationToken token = default(CancellationToken))
{
await _stateAsyncLock.WaitAsync(token);
try
{
if (_paused)
{
return;
}
Task pauseConfirmationTask = null;
await _pauseRequestAsyncLock.WaitAsync(token);
try
{
_pauseRequested = true;
_resumeRequestTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
_pauseConfirmationTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
pauseConfirmationTask = WaitForPauseConfirmationAsync(token);
}
finally
{
_pauseRequestAsyncLock.Release();
}
await pauseConfirmationTask;
_paused = true;
}
finally
{
_stateAsyncLock.Release();
}
}
private async Task WaitForResumeRequestAsync(CancellationToken token)
{
using (token.Register(() => _resumeRequestTcs.TrySetCanceled(), useSynchronizationContext: false))
{
await _resumeRequestTcs.Task;
}
}
private async Task WaitForPauseConfirmationAsync(CancellationToken token)
{
using (token.Register(() => _pauseConfirmationTcs.TrySetCanceled(), useSynchronizationContext: false))
{
await _pauseConfirmationTcs.Task;
}
}
internal async Task PauseIfRequestedAsync(CancellationToken token = default(CancellationToken))
{
Task resumeRequestTask = null;
await _pauseRequestAsyncLock.WaitAsync(token);
try
{
if (!_pauseRequested)
{
return;
}
resumeRequestTask = WaitForResumeRequestAsync(token);
_pauseConfirmationTcs.TrySetResult(true);
}
finally
{
_pauseRequestAsyncLock.Release();
}
await resumeRequestTask;
}
}
// PauseToken - consumer side
public struct PauseToken
{
readonly PauseTokenSource _source;
public PauseToken(PauseTokenSource source) { _source = source; }
public Task<bool> IsPaused() { return _source.IsPaused(); }
public Task PauseIfRequestedAsync(CancellationToken token = default(CancellationToken))
{
return _source.PauseIfRequestedAsync(token);
}
}
// Basic usage
public static async Task DoWorkAsync(PauseToken pause, CancellationToken token)
{
try
{
while (true)
{
token.ThrowIfCancellationRequested();
Console.WriteLine("Before await pause.PauseIfRequestedAsync()");
await pause.PauseIfRequestedAsync();
Console.WriteLine("After await pause.PauseIfRequestedAsync()");
await Task.Delay(1000);
}
}
catch (Exception e)
{
Console.WriteLine("Exception: {0}", e);
throw;
}
}
static async Task Test(CancellationToken token)
{
var pts = new PauseTokenSource();
var task = DoWorkAsync(pts.Token, token);
while (true)
{
token.ThrowIfCancellationRequested();
Console.WriteLine("Press enter to pause...");
Console.ReadLine();
Console.WriteLine("Before pause requested");
await pts.PauseAsync();
Console.WriteLine("After pause requested, paused: " + await pts.IsPaused());
Console.WriteLine("Press enter to resume...");
Console.ReadLine();
Console.WriteLine("Before resume");
await pts.ResumeAsync();
Console.WriteLine("After resume");
}
}
static async Task Main()
{
await Test(CancellationToken.None);
}
}
}
AsyncManualResetEvent
考虑到您当前的代码有多混乱,这正是您所需要的。但更好的解决方案是使用 Stephen Toub 的另一种方法:PauseToken
. 它的工作原理与 类似AsyncManualResetEvent
,只是它的接口是专门为此目的而设计的。
这对我有用
using System;
using System.Threading;
using System.Threading.Tasks;
namespace TaskTest2
{
class Program
{
static ManualResetEvent mre = new ManualResetEvent(false);
static void Main(string[] args)
{
mre.Set();
Task.Factory.StartNew(() =>
{
while (true)
{
Console.WriteLine("________________");
mre.WaitOne();
}
} );
Thread.Sleep(10000);
mre.Reset();
Console.WriteLine("Task Paused");
Thread.Sleep(10000);
Console.WriteLine("Task Will Resume After 1 Second");
Thread.Sleep(1000);
mre.Set();
Thread.Sleep(10000);
mre.Reset();
Console.WriteLine("Task Paused");
Console.Read();
}
}
}
当涉及到异步/等待编程时,所有其他答案似乎要么很复杂,要么没有标记,因为它持有 CPU 昂贵且可能导致死锁的线程。经过大量的试验、错误和许多死锁,这终于适用于我的高使用率测试。
var isWaiting = true;
while (isWaiting)
{
try
{
//A long delay is key here to prevent the task system from holding the thread.
//The cancellation token allows the work to resume with a notification
//from the CancellationTokenSource.
await Task.Delay(10000, cancellationToken);
}
catch (TaskCanceledException)
{
//Catch the cancellation and it turns into continuation
isWaiting = false;
}
}
好吧,也许这值得回答,但我对 C# 不是很熟悉,而且我这里没有 MonoDevelop,现在是凌晨 3 点,所以请见谅。
我建议这样的事情
class Spellchecker
{
private CancellationTokenSource mustStop = null;
private volatile Task currentTask = null;
//TODO add other state variables as needed
public void StartSpellchecker()
{
if (currentTask != null)
{
/*
* A task is already running,
* you can either throw an exception
* or silently return
*/
}
mustStop = new CancellationTokenSource();
currentTask = SpellcheckAsync(mustStop.Token);
currentTask.Start();
}
private async Task SpellcheckAsync(CancellationToken ct)
{
while (!ct.IsCancellationRequested))
{
/*
* TODO perform spell check
* This method must be the only one accessing
* the spellcheck-related state variables
*/
}
currentTask = null;
}
public async Task StopSpellchecker()
{
if (currentTask == null)
{
/*
* There is no task running
* you can either throw an exception
* or silently return
*/
}
else
{
/*
* A CancelAfter(TimeSpan) method
* is also available, which might interest you
*/
mustStop.Cancel();
//Remove the following lines if you don't want to wait for the task to actually stop
var task = currentTask;
if (task != null)
{
await task;
}
}
}
}