0

我正在将消息添加到线程通道并不断从该通道读取。我注意到如果我不添加 aTask.Delay那么所有的消息都不会被处理。当它应该是 1000 时,程序将退出可能处理 10 条消息。

每次写入后添加一个Task.Delay似乎很hacky。有没有更好的方法来读取频道中的所有消息,而无需Task.Delay在每次写入后添加一个?

我的StartListener()方法有问题吗?

internal class Program
{
    static List<Task> taskList = new List<Task>();
    private static Channel<string> messageList = Channel.CreateUnbounded<string>();
    static int midpointCount = 0;
    static void Main(string[] args)
    {
        Stopwatch sw = new Stopwatch();
        sw.Start();
        Task.WhenAll(Task.Run(() => StartListener()));
        for (int i = 0; i < 10; i++)
        {
            int task = i;
            taskList.Add(Task.Run(() => StartJob(task)));
        }            
        Task.WaitAll(taskList.ToArray());
        sw.Stop();
        Console.WriteLine("Total Messages Processed: {0} in time {1} MessageListCount {2}", midpointCount, sw.Elapsed, messageList.Reader.Count);
    }

    private static async Task<string> StartListener()
    {
        var cancellationtoken = new CancellationTokenSource(TimeSpan.FromMinutes(60)).Token;
        await foreach (var msg in messageList.Reader.ReadAllAsync(cancellationtoken))
        {
            if (!string.IsNullOrEmpty(msg))
            {
                Console.WriteLine(msg);
                Interlocked.Increment(ref midpointCount);
            }
        }           
        return "Finished";
    }

    private static async Task<string> StartJob(int TaskNum)
    {
        Random rnd = new Random();
        for (int i = 0; i < 100; i++)
        {
            var cancellationtoken = new CancellationTokenSource(TimeSpan.FromMinutes(60)).Token;
            try
            {                    
                var message = string.Format("TaskNum {0} Message added #{1}", TaskNum, rnd.Next(1, 3000));
                await messageList.Writer.WriteAsync(message);
                await Task.Delay(50);  //<--- Here it seems it will only read all messages with a delay involved.
            }
            catch (OperationCanceledException)
            {
                // ignored
            }
        }           
        return "Finished";
    }
}
4

1 回答 1

2
Task.WhenAll(Task.Run(() => StartListener()));

StartListener返回一个Task。您将其包装在 中Task.Run,启动另一个线程来运行该任务。然后,您将任务传递给方法Task.WhenAll该方法返回Task您立即丢弃的 a。

您添加到taskList变量的唯一任务是StartJob任务。Main一旦所有任务完成,您的方法就会完成StartJob。它不会等待StartListener任务完成。

更改您的代码以等待侦听器完成。

static void Main(string[] args)
{
    Stopwatch sw = new Stopwatch();
    sw.Start();
    
    taskList.Add(Task.Run(() => StartListener()));
    
    for (int i = 0; i < 10; i++)
    {
        int task = i;
        taskList.Add(Task.Run(() => StartJob(task)));
    }

     Task.WaitAll(taskList.ToArray());
     sw.Stop();
     
     Console.WriteLine("Total Messages Processed: {0} in time {1} MessageListCount {2}", 
        midpointCount, sw.Elapsed, messageList.Reader.Count);
}
于 2022-01-17T16:05:11.897 回答