1

这篇文章之后,我一直在玩System.Threading.Channel以获得足够的信心并在我的生产代码中使用它,替换Threads/Monitor.Pulse/Wait我目前使用的基于方法(在引用的帖子中描述)。

基本上,我创建了一个有界通道的示例,在其中我在开始时运行了几个生产者任务,并且无需等待,就启动了我的消费者任务,这些任务开始从通道中推送元素。在等待生产者任务完成后,我会向通道发出完成信号,因此消费者任务可以退出监听新的通道元素。

我的频道是 a Channel<Action>,在每个操作中,我都会增加WorkDistribution并发字典中每个给定工作人员的计数,并在示例结束时打印它,这样我就可以检查我消耗了多少我预期的项目,以及如何渠道在消费者之间分配动作。

由于某种原因,此“工作分配页脚”打印的项目数量与生产者任务生产的项目总数不同。

我错过了什么?添加的一些变量的唯一目的是帮助排除故障。

这是完整的代码:

public class ChannelSolution
{
    object LockObject = new object();
    Channel<Action<string>> channel;
    int ItemsToProduce;
    int WorkersCount;
    int TotalItemsProduced;
    ConcurrentDictionary<string, int> WorkDistribution;
    CancellationToken Ct;
    public ChannelSolution(int workersCount, int itemsToProduce, int maxAllowedItems,
        CancellationToken ct)
    {
        WorkersCount = workersCount;
        ItemsToProduce = itemsToProduce;
        channel = Channel.CreateBounded<Action<string>>(maxAllowedItems);
        Console.WriteLine($"Created channel with max {maxAllowedItems} items");
        WorkDistribution = new ConcurrentDictionary<string, int>();
        Ct = ct;
    }

    async Task ProduceItems(int cycle)
    {
        for (var i = 0; i < ItemsToProduce; i++)
        {
            var index = i + 1 + (ItemsToProduce * cycle);
            bool queueHasRoom;
            var stopwatch = new Stopwatch();
            stopwatch.Start();
            do
            {
                if (Ct.IsCancellationRequested)
                {
                    Console.WriteLine("exiting read loop - cancellation requested !");
                    break;
                }
                queueHasRoom = await channel.Writer.WaitToWriteAsync();
                if (!queueHasRoom)
                {
                    if (Ct.IsCancellationRequested)
                    {
                        Console.WriteLine("exiting read loop - cancellation"
                            + " requested !");
                        break;
                    }

                    if (stopwatch.Elapsed.Seconds % 3 == 0)
                        Console.WriteLine("Channel reached maximum capacity..."
                            + " producer waiting for items to be freed...");
                }
            }
            while (!queueHasRoom);
            channel.Writer.TryWrite((workerName) => action($"A{index}", workerName));
            Console.WriteLine($"Channel has room, item {index} added"
                + $" - channel items count: [{channel.Reader.Count}]");
            Interlocked.Increment(ref TotalItemsProduced);
        }
    }

    List<Task> GetConsumers()
    {
        var tasks = new List<Task>();
        for (var i = 0; i < WorkersCount; i++)
        {
            var workerName = $"W{(i + 1).ToString("00")}";
            tasks.Add(Task.Run(async () =>
            {
                while (await channel.Reader.WaitToReadAsync())
                {
                    if (Ct.IsCancellationRequested)
                    {
                        Console.WriteLine("exiting write loop - cancellation"
                            + "requested !");
                        break;
                    }

                    if (channel.Reader.TryRead(out var action))
                    {
                        Console.WriteLine($"dequed action in worker [{workerName}]");
                        action(workerName);
                    }
                }
            }));
        }

        return tasks;
    }

    void action(string actionNumber, string workerName)
    {
        Console.WriteLine($"processing {actionNumber} in worker {workerName}...");
        var secondsToWait = new Random().Next(2, 5);
        Thread.Sleep(TimeSpan.FromSeconds(secondsToWait));
        Console.WriteLine($"action {actionNumber} completed by worker {workerName}"
            + $" after {secondsToWait} secs! channel items left:"
            + $" [{channel.Reader.Count}]");

        if (WorkDistribution.ContainsKey(workerName))
        {
            lock (LockObject)
            {
                WorkDistribution[workerName]++;
            }
        }
        else
        {
            var succeeded = WorkDistribution.TryAdd(workerName, 1);
            if (!succeeded)
            {
                Console.WriteLine($"!!! failed incremeting dic value !!!");
            }

        }
    }

    public void Summarize(Stopwatch stopwatch)
    {
        Console.WriteLine("--------------------------- Thread Work Distribution "
            + "------------------------");
        foreach (var kv in this.WorkDistribution)
            Console.WriteLine($"thread: {kv.Key} items consumed: {kv.Value}");

        Console.WriteLine($"Total actions consumed: "
            + $"{WorkDistribution.Sum(w => w.Value)} - Elapsed time: "
            + $"{stopwatch.Elapsed.Seconds} secs");

    }

    public void Run(int producerCycles)
    {
        var stopwatch = new Stopwatch();
        stopwatch.Start();
        var producerTasks = new List<Task>();

        Console.WriteLine($"Started running at {DateTime.Now}...");
        for (var i = 0; i < producerCycles; i++)
        {
            producerTasks.Add(ProduceItems(i));
        }
        var consumerTasks = GetConsumers();
        Task.WaitAll(producerTasks.ToArray());
        Console.WriteLine($"-------------- Completed waiting for PRODUCERS -"
            + " total items produced: [{TotalItemsProduced}] ------------------");
        channel.Writer.Complete(); //just so I can complete this demo

        Task.WaitAll(consumerTasks.ToArray());
        Console.WriteLine("----------------- Completed waiting for CONSUMERS "
            + "------------------");
        //Task.WaitAll(GetConsumers().Union(producerTasks/*.Union(
        //    new List<Task> { taskKey })*/).ToArray());
        //Console.WriteLine("Completed waiting for tasks");

        Summarize(stopwatch);
    }
}

这是 Program.cs 中的调用代码

var workersCount = 5;
var itemsToProduce = 10;
var maxItemsInQueue = 5;
var cts = new CancellationTokenSource();
var producerConsumerTests = new ProducerConsumerTests(workersCount, itemsToProduce,
    maxItemsInQueue, cts.Token);
producerConsumerTests.Run(2);
4

1 回答 1

3

快速浏览一下,该ProduceItems方法中存在一个围绕queueHasRoom变量的竞争条件。你不需要这个变量。该channel.Writer.TryWrite方法将告诉您通道缓冲区中是否有空间。或者,您可以简单地使用awaitWriteAsync方法,而不是使用WaitToWriteAsync/TryWrite组合。AFAIK 这个组合旨在作为前一种方法的性能优化。如果您在尝试发布值之前绝对需要知道是否有可用空间,那么Channel<T>它可能不适合您的用例。您需要在“检查可用空间->创建值->后值”的整个操作过程中找到可以锁定的东西,以便可以使该操作成为原子操作。

作为旁注,使用锁来保护更新ConcurrentDictionary是多余的。ConcurrentDictionary提供了一种方法,该AddOrUpdate方法可以用另一个值原子地替换它包含的值。如果字典包含可变对象,您可能必须锁定,并且您需要以线程安全的方式改变这些对象。但是在您的情况下,这些值是 type Int32,这是一个不可变的结构。您无需更改它,只需将其替换为 new Int32,它是基于现有值创建的:

WorkDistribution.AddOrUpdate(workerName, 1, (_, existing) => existing + 1);
于 2021-01-27T13:10:50.513 回答