2

我有一个应用程序定期启动即发即弃任务,主要用于记录目的,我的问题是当应用程序关闭时,任何当前正在运行的即发即弃任务都会中止。我想防止这种情况发生,所以我正在寻找一种机制,允许我await在关闭我的应用程序之前完成所有正在运行的即发即弃操作。我不想处理他们可能出现的异常,我不在乎这些。我只是想让他们有机会完成(可能会超时,但这不是问题的一部分)。

你可能会争辩说,这个要求使我的任务不是真正的一劳永逸,这有一些道理,所以我想澄清这一点:

  1. 这些任务在本地是一劳永逸的,因为启动它们的方法对它们的结果不感兴趣。
  2. 这些任务不是全局性的,因为整个应用程序都关心它们。

这是该问题的最小演示:

static class Program
{
    static async Task Main(string[] args)
    {
        _ = Log("Starting"); // fire and forget
        await Task.Delay(1000); // Simulate the main asynchronous workload
        CleanUp();
        _ = Log("Finished"); // fire and forget

        // Here any pending fire and forget operations should be awaited somehow
    }

    private static void CleanUp()
    {
        _ = Log("CleanUp started"); // fire and forget
        Thread.Sleep(200); // Simulate some synchronous operation
        _ = Log("CleanUp completed"); // fire and forget
    }

    private static async Task Log(string message)
    {
        await Task.Delay(100); // Simulate an async I/O operation required for logging
        Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} {message}");
    }
}

输出:

11:14:11.441 Starting
11:14:12.484 CleanUp started
Press any key to continue . . .

和条目没有被记录,因为应用程序提前终止,并且挂起的任务被中止"CleanUp completed""Finished"有什么方法可以在关闭前等待他们完成吗?

顺便说一句,这个问题的灵感来自@SHAFEESPS最近提出的一个问题,遗憾的是,该问题因不清楚而被关闭。

澄清:上面给出的最小示例包含一种类型的即发即弃操作,即Task Log方法。现实世界应用程序发起的即发即弃操作是多种多样的。有些甚至返回通用任务,例如Task<string>or Task<int>

一个即发即弃的任务也有可能触发次要的即发即弃任务,这些任务也应该被允许启动并被等待。

4

2 回答 2

2

也许像柜台一样等待退出?这仍然会很火,然后忘记。

我只是转移LogAsync到它自己的方法,因为每次调用 Log 时都不需要丢弃。我想它还可以处理在程序退出时调用 Log 时会发生的微小竞争条件。

public class Program
{
    static async Task Main(string[] args)
    {
        Log("Starting"); // fire and forget
        await Task.Delay(1000); // Simulate the main asynchronous workload
        CleanUp();
        Log("Finished"); // fire and forget

        // Here any pending fire and forget operations should be awaited somehow
        var spin = new SpinWait();

        while (_backgroundTasks > 0)
        {
            spin.SpinOnce();
        }
    }

    private static void CleanUp()
    {
        Log("CleanUp started"); // fire and forget
        Thread.Sleep(200); // Simulate some synchronous operation
        Log("CleanUp completed"); // fire and forget
    }

    private static int _backgroundTasks;

    private static void Log(string message)
    {
        Interlocked.Increment(ref _backgroundTasks);
        _ = LogAsync(message);
    }

    private static async Task LogAsync(string message)
    {
        await Task.Delay(100); // Simulate an async I/O operation required for logging
        Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} {message}");
        Interlocked.Decrement(ref _backgroundTasks);
    }
}
于 2020-11-07T09:46:10.600 回答
2

一件合理的事情是在您的记录器中有内存队列(这适用于与您的条件匹配的其他类似功能),这是单独处理的。然后你的 log 方法就变成了这样:

private static readonly BlockingCollection<string> _queue = new BlockingCollection<string>(new ConcurrentQueue<string>());
public static void Log(string message) {
   _queue.Add(message);
}

它对调用者来说非常快速且无阻塞,并且在某种意义上是异步的,它在未来某个时间完成(或失败)。调用者不知道也不关心结果,所以这是一个即发即弃的任务。

但是,这个队列是单独处理的(通过将日志消息插入最终目的地,如文件或数据库),可能在一个单独的线程中,或者通过等待(和线程池线程),这无关紧要。

然后在应用程序退出之前,您只需要通知队列处理器不需要更多项目,并等待它完成。例如:

_queue.CompleteAdding(); // no more items
_processorThread.Join(); // if you used separate thread, otherwise some other synchronization construct.

编辑:如果您希望队列处理是异步的 - 您可以使用此AsyncCollection(可作为 nuget 包使用)。然后你的代码变成:

class Program {
    private static Logger _logger;
    static async Task Main(string[] args) {
        _logger = new Logger();
        _logger.Log("Starting"); // fire and forget
        await Task.Delay(1000); // Simulate the main asynchronous workload
        CleanUp();
        _logger.Log("Finished"); // fire and forget
        await _logger.Stop();
        // Here any pending fire and forget operations should be awaited somehow
    }

    private static void CleanUp() {
        _logger.Log("CleanUp started"); // fire and forget
        Thread.Sleep(200); // Simulate some synchronous operation
        _logger.Log("CleanUp completed"); // fire and forget
    }
}

class Logger {
    private readonly AsyncCollection<string> _queue = new AsyncCollection<string>(new ConcurrentQueue<string>());
    private readonly Task _processorTask;
    public Logger() {
        _processorTask = Process();
    }

    public void Log(string message) {
        // synchronous adding, you can also make it async via 
        // _queue.AddAsync(message); but I see no reason to
        _queue.Add(message);
    }

    public async Task Stop() {
        _queue.CompleteAdding();
        await _processorTask;
    }

    private async Task Process() {
        while (true) {
            string message;
            try {
                message = await _queue.TakeAsync();
            }
            catch (InvalidOperationException) {
                // throws this exception when collection is empty and CompleteAdding was called
                return;
            }

            await Task.Delay(100);
            Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} {message}");
        }
    }
}

或者您可以像通常那样使用单独的专用线程来同步处理项目。

编辑 2:这里是引用计数的变体,它不对“即发即弃”任务的性质做出任何假设:

static class FireAndForgetTasks {

    // start with 1, in non-signaled state
    private static readonly CountdownEvent _signal = new CountdownEvent(1);

    public static void AsFireAndForget(this Task task) {
        // add 1 for each task
        _signal.AddCount();
        task.ContinueWith(x => {
            if (x.Exception != null) {
                // do something, task has failed, maybe log 
            }
            // decrement 1 for each task, it cannot reach 0 and become signaled, because initial count was 1
            _signal.Signal();
        });
    }

    public static void Wait(TimeSpan? timeout = null) {
        // signal once. Now event can reach zero and become signaled, when all pending tasks will finish
        _signal.Signal();
        // wait on signal
        if (timeout != null)
            _signal.Wait(timeout.Value);
        else
            _signal.Wait();
        // dispose the signal
        _signal.Dispose();
    }
}

您的样本变为:

static class Program {
    static async Task Main(string[] args) {
        Log("Starting").AsFireAndForget(); // fire and forget
        await Task.Delay(1000); // Simulate the main asynchronous workload
        CleanUp();
        Log("Finished").AsFireAndForget(); // fire and forget
        FireAndForgetTasks.Wait();
        // Here any pending fire and forget operations should be awaited somehow
    }

    private static void CleanUp() {
        Log("CleanUp started").AsFireAndForget(); // fire and forget
        Thread.Sleep(200); // Simulate some synchronous operation
        Log("CleanUp completed").AsFireAndForget(); // fire and forget
    }

    private static async Task Log(string message) {
        await Task.Delay(100); // Simulate an async I/O operation required for logging
        Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} {message}");
    }
}
于 2020-11-07T11:12:03.617 回答