我正在将消息添加到线程通道并不断从该通道读取。我注意到如果我不添加 aTask.Delay
那么所有的消息都不会被处理。当它应该是 1000 时,程序将退出可能处理 10 条消息。
每次写入后添加一个Task.Delay
似乎很hacky。有没有更好的方法来读取频道中的所有消息,而无需Task.Delay
在每次写入后添加一个?
我的StartListener()
方法有问题吗?
internal class Program
{
static List<Task> taskList = new List<Task>();
private static Channel<string> messageList = Channel.CreateUnbounded<string>();
static int midpointCount = 0;
static void Main(string[] args)
{
Stopwatch sw = new Stopwatch();
sw.Start();
Task.WhenAll(Task.Run(() => StartListener()));
for (int i = 0; i < 10; i++)
{
int task = i;
taskList.Add(Task.Run(() => StartJob(task)));
}
Task.WaitAll(taskList.ToArray());
sw.Stop();
Console.WriteLine("Total Messages Processed: {0} in time {1} MessageListCount {2}", midpointCount, sw.Elapsed, messageList.Reader.Count);
}
private static async Task<string> StartListener()
{
var cancellationtoken = new CancellationTokenSource(TimeSpan.FromMinutes(60)).Token;
await foreach (var msg in messageList.Reader.ReadAllAsync(cancellationtoken))
{
if (!string.IsNullOrEmpty(msg))
{
Console.WriteLine(msg);
Interlocked.Increment(ref midpointCount);
}
}
return "Finished";
}
private static async Task<string> StartJob(int TaskNum)
{
Random rnd = new Random();
for (int i = 0; i < 100; i++)
{
var cancellationtoken = new CancellationTokenSource(TimeSpan.FromMinutes(60)).Token;
try
{
var message = string.Format("TaskNum {0} Message added #{1}", TaskNum, rnd.Next(1, 3000));
await messageList.Writer.WriteAsync(message);
await Task.Delay(50); //<--- Here it seems it will only read all messages with a delay involved.
}
catch (OperationCanceledException)
{
// ignored
}
}
return "Finished";
}
}