19

是否有推荐的自行取消和重新启动任务的既定模式?

例如,我正在开发后台拼写检查器的 API。拼写检查会话包装为Task. 每个新会话都应取消前一个会话并等待其终止(以正确重用拼写检查服务提供商等资源)。

我想出了这样的事情:

class Spellchecker
{
    Task pendingTask = null; // pending session
    CancellationTokenSource cts = null; // CTS for pending session

    // SpellcheckAsync is called by the client app
    public async Task<bool> SpellcheckAsync(CancellationToken token)
    {
        // SpellcheckAsync can be re-entered
        var previousCts = this.cts;
        var newCts = CancellationTokenSource.CreateLinkedTokenSource(token);
        this.cts = newCts;

        if (IsPendingSession())
        {
            // cancel the previous session and wait for its termination
            if (!previousCts.IsCancellationRequested)
                previousCts.Cancel();
            // this is not expected to throw
            // as the task is wrapped with ContinueWith
            await this.pendingTask; 
        }

        newCts.Token.ThrowIfCancellationRequested();
        var newTask = SpellcheckAsyncHelper(newCts.Token);

        this.pendingTask = newTask.ContinueWith((t) => {
            this.pendingTask = null;
            // we don't need to know the result here, just log the status
            Debug.Print(((object)t.Exception ?? (object)t.Status).ToString());
        }, TaskContinuationOptions.ExecuteSynchronously);

        return await newTask;
    }

    // the actual task logic
    async Task<bool> SpellcheckAsyncHelper(CancellationToken token)
    {
        // do not start a new session if the the previous one still pending
        if (IsPendingSession())
            throw new ApplicationException("Cancel the previous session first.");

        // do the work (pretty much IO-bound)
        try
        {
            bool doMore = true;
            while (doMore)
            {
                token.ThrowIfCancellationRequested();
                await Task.Delay(500); // placeholder to call the provider
            }
            return doMore;
        }
        finally
        {
            // clean-up the resources
        }
    }

    public bool IsPendingSession()
    {
        return this.pendingTask != null &&
            !this.pendingTask.IsCompleted &&
            !this.pendingTask.IsCanceled &&
            !this.pendingTask.IsFaulted;
    }
}

客户端应用程序(UI)应该能够根据SpellcheckAsync需要多次调用,而不必担心取消挂起的会话。主doMore循环在 UI 线程上运行(因为它涉及 UI,而所有拼写检查服务提供程序调用都是 IO 绑定的)。

SpellcheckAsync我不得不将 API 分成两部分和,这让我感到有点不舒服SpellcheckAsyncHelper,但我想不出更好的方法来做到这一点,而且它还有待测试。

4

4 回答 4

22

我认为总体概念非常好,但我建议您不要使用ContinueWith.

我只是使用常规编写它await,并且不需要很多“我已经在运行”的逻辑:

Task pendingTask = null; // pending session
CancellationTokenSource cts = null; // CTS for pending session

// SpellcheckAsync is called by the client app on the UI thread
public async Task<bool> SpellcheckAsync(CancellationToken token)
{
    // SpellcheckAsync can be re-entered
    var previousCts = this.cts;
    var newCts = CancellationTokenSource.CreateLinkedTokenSource(token);
    this.cts = newCts;

    if (previousCts != null)
    {
        // cancel the previous session and wait for its termination
        previousCts.Cancel();
        try { await this.pendingTask; } catch { }
    }

    newCts.Token.ThrowIfCancellationRequested();
    this.pendingTask = SpellcheckAsyncHelper(newCts.Token);
    return await this.pendingTask;
}

// the actual task logic
async Task<bool> SpellcheckAsyncHelper(CancellationToken token)
{
    // do the work (pretty much IO-bound)
    using (...)
    {
        bool doMore = true;
        while (doMore)
        {
            token.ThrowIfCancellationRequested();
            await Task.Delay(500); // placeholder to call the provider
        }
        return doMore;
    }
}
于 2013-09-25T12:34:55.737 回答
5

这是我使用的取消并重新启动模式的最新版本:

class AsyncWorker
{
    Task _pendingTask;
    CancellationTokenSource _pendingTaskCts;

    // the actual worker task
    async Task DoWorkAsync(CancellationToken token)
    {
        token.ThrowIfCancellationRequested();
        Debug.WriteLine("Start.");
        await Task.Delay(100, token);
        Debug.WriteLine("Done.");
    }

    // start/restart
    public void Start(CancellationToken token)
    {
        var previousTask = _pendingTask;
        var previousTaskCts = _pendingTaskCts;

        var thisTaskCts = CancellationTokenSource.CreateLinkedTokenSource(token);

        _pendingTask = null;
        _pendingTaskCts = thisTaskCts;

        // cancel the previous task
        if (previousTask != null && !previousTask.IsCompleted)
            previousTaskCts.Cancel();

        Func<Task> runAsync = async () =>
        {
            // await the previous task (cancellation requested)
            if (previousTask != null)
                await previousTask.WaitObservingCancellationAsync();

            // if there's a newer task started with Start, this one should be cancelled
            thisTaskCts.Token.ThrowIfCancellationRequested();

            await DoWorkAsync(thisTaskCts.Token).WaitObservingCancellationAsync();
        };

        _pendingTask = Task.Factory.StartNew(
            runAsync,
            CancellationToken.None,
            TaskCreationOptions.None,
            TaskScheduler.FromCurrentSynchronizationContext()).Unwrap();
    }

    // stop
    public void Stop()
    {
        if (_pendingTask == null)
            return;

        if (_pendingTask.IsCanceled)
            return;

        if (_pendingTask.IsFaulted)
            _pendingTask.Wait(); // instantly throw an exception

        if (!_pendingTask.IsCompleted)
        {
            // still running, request cancellation 
            if (!_pendingTaskCts.IsCancellationRequested)
                _pendingTaskCts.Cancel();

            // wait for completion
            if (System.Threading.Thread.CurrentThread.GetApartmentState() == ApartmentState.MTA)
            {
                // MTA, blocking wait
                _pendingTask.WaitObservingCancellation();
            }
            else
            {
                // TODO: STA, async to sync wait bridge with DoEvents,
                // similarly to Thread.Join
            }
        }
    }
}

// useful extensions
public static class Extras
{
    // check if exception is OperationCanceledException
    public static bool IsOperationCanceledException(this Exception ex)
    {
        if (ex is OperationCanceledException)
            return true;

        var aggEx = ex as AggregateException;
        return aggEx != null && aggEx.InnerException is OperationCanceledException;
    }

    // wait asynchrnously for the task to complete and observe exceptions
    public static async Task WaitObservingCancellationAsync(this Task task)
    {
        try
        {
            await task;
        }
        catch (Exception ex)
        {
            // rethrow if anything but OperationCanceledException
            if (!ex.IsOperationCanceledException())
                throw;
        }
    }

    // wait for the task to complete and observe exceptions
    public static void WaitObservingCancellation(this Task task)
    {
        try
        {
            task.Wait();
        }
        catch (Exception ex)
        {
            // rethrow if anything but OperationCanceledException
            if (!ex.IsOperationCanceledException())
                throw;
        }
    }
}

测试使用(仅产生一个“开始/完成”输出DoWorkAsync):

private void MainForm_Load(object sender, EventArgs e)
{
    var worker = new AsyncWorker();
    for (var i = 0; i < 10; i++)
        worker.Start(CancellationToken.None);
}
于 2013-12-02T04:31:17.290 回答
0

当异步方法被快速调用多次,例如四次时,上面的示例似乎存在问题。然后对该方法的所有后续调用取消第一个任务,最后生成三个同时运行的新任务。所以我想出了这个:

    private List<Tuple<Task, CancellationTokenSource>> _parameterExtractionTasks = new List<Tuple<Task, CancellationTokenSource>>();

    /// <remarks>This method is asynchronous, i.e. it runs partly in the background. As this method might be called multiple times 
    /// quickly after each other, a mechanism has been implemented that <b>all</b> tasks from previous method calls are first canceled before the task is started anew.</remarks>
    public async void ParameterExtraction() {

        CancellationTokenSource newCancellationTokenSource = new CancellationTokenSource();

        // Define the task which shall run in the background.
        Task newTask = new Task(() => {
            // do some work here
                }
            }
        }, newCancellationTokenSource.Token);

        _parameterExtractionTasks.Add(new Tuple<Task, CancellationTokenSource>(newTask, newCancellationTokenSource));

        /* Convert the list to arrays as an exception is thrown if the number of entries in a list changes while 
         * we are in a for loop. This can happen if this method is called again while we are waiting for a task. */
        Task[] taskArray = _parameterExtractionTasks.ConvertAll(item => item.Item1).ToArray();
        CancellationTokenSource[] tokenSourceArray = _parameterExtractionTasks.ConvertAll(item => item.Item2).ToArray();

        for (int i = 0; i < taskArray.Length - 1; i++) { // -1: the last task, i.e. the most recent task, shall be run and not canceled. 
            // Cancel all running tasks which were started by previous calls of this method
            if (taskArray[i].Status == TaskStatus.Running) {
                tokenSourceArray[i].Cancel();
                await taskArray[i]; // wait till the canceling completed
            }
        }

        // Get the most recent task
        Task currentThreadToRun = taskArray[taskArray.Length - 1];

        // Start this task if, but only if it has not been started before (i.e. if it is still in Created state). 
        if (currentThreadToRun.Status == TaskStatus.Created) {
            currentThreadToRun.Start();
            await currentThreadToRun; // wait till this task is completed.
        }

        // Now the task has been completed once. Thus we can recent the list of tasks to cancel or maybe run.
        _parameterExtractionTasks = new List<Tuple<Task, CancellationTokenSource>>();
    }
于 2017-10-24T07:31:19.407 回答
0

希望这会有用 - 尝试创建可以重复使用的 Helper 类:

class SelfCancelRestartTask
{
    private Task _task = null;
    public CancellationTokenSource TokenSource { get; set; } = null;

    public SelfCancelRestartTask()
    {
    }

    public async Task Run(Action operation)
    {
        if (this._task != null &&
            !this._task.IsCanceled &&
            !this._task.IsCompleted &&
            !this._task.IsFaulted)
        {
            TokenSource?.Cancel();
            await this._task;
            TokenSource = new CancellationTokenSource();
        }
        else
        {
            TokenSource = new CancellationTokenSource();
        }
        this._task = Task.Run(operation, TokenSource.Token);
    }
于 2017-09-14T20:45:35.510 回答