1

我有来自多个线程的订单,我想在一个线程中处理这些数据。如果我理解正确,那么可以使用 ConcurrentQueue。

我看了 SO question How to work threading with ConcurrentQueue<T>,但它没有回答我的问题。

我编写了一个小型测试应用程序(使用 .NET Core 2.1),看看是否可以让它工作。

这就是它应该做的:为 100 个订单进行聚合。3 种不同的订单类型有 3 种聚合:Type1、Type2 和 Type3

输出应该是这样的:

Type: Type1 Count: 38
Type: Type2 Count: 31
Type: Type3 Count: 31
Total for all types: 100

我开始编写没有 ConcurrentQueue 的应用程序。正如预期的那样,_aggregates 中的结果是错误的。

/* Incorrect version, not using ConcurrentQueue, showing incorrect results */

using System;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Threading.Tasks;

namespace ConcurrentQueue
{
    class Program
    {
        private static readonly int OrderCount = 100;

        private static IEnumerable<Order> _orders;

        private static Dictionary<OrderTypeEnum, Aggregate> _aggregates;

        static void Main(string[] args)
        {
            //Prepare
            InitializeAggregates();
            _orders = CreateOrders();

            //Execute
            MainAsync(args).GetAwaiter().GetResult();
        }

        static async Task MainAsync(string[] args)
        {
            await Task.Run(() => ProcessOrders());

            ShowOutput();
        }

        public static void ProcessOrders()
        {
            var aggregator = new Aggregator();
            Parallel.ForEach(_orders, order => {
                aggregator.Aggregate(order, _aggregates);
            });
        }

        private static IEnumerable<Order> CreateOrders()
        {
            var orderList = new Collection<Order>();

            for (var i = 1; i <= OrderCount; i++)
            {
                var order = CreateOrder(i);
                orderList.Add(order);
            }

            return orderList;
        }

        private static void InitializeAggregates()
        {
            _aggregates = new Dictionary<OrderTypeEnum, Aggregate>();
            _aggregates[OrderTypeEnum.Type1] = new Aggregate();
            _aggregates[OrderTypeEnum.Type2] = new Aggregate();
            _aggregates[OrderTypeEnum.Type3] = new Aggregate();
        }

        private static Order CreateOrder(int id)
        {
            var order = new Order() { Id = id, OrderType = GetRandomAggregtationType() };
            return order;
        }

        private static OrderTypeEnum GetRandomAggregtationType()
        {
            Array values = Enum.GetValues(typeof(OrderTypeEnum));
            var random = new Random();
            return (OrderTypeEnum)values.GetValue(random.Next(values.Length));
        }

        private static void ShowOutput()
        {
            Console.WriteLine($"Type: {OrderTypeEnum.Type1} Count: {_aggregates[OrderTypeEnum.Type1].Count}");
            Console.WriteLine($"Type: {OrderTypeEnum.Type2} Count: {_aggregates[OrderTypeEnum.Type2].Count}");
            Console.WriteLine($"Type: {OrderTypeEnum.Type3} Count: {_aggregates[OrderTypeEnum.Type3].Count}");
            var total =
                _aggregates[OrderTypeEnum.Type1].Count +
                _aggregates[OrderTypeEnum.Type2].Count +
                _aggregates[OrderTypeEnum.Type3].Count;
            Console.WriteLine($"Total for all types: {total}");
            Console.ReadKey();
        }
    }

    public class Order
    {
        public int Id { get; set; }
        public OrderTypeEnum OrderType { get; set; }
    }

    public class Aggregator
    {
        public void Aggregate(Order order, Dictionary<OrderTypeEnum, Aggregate> aggregates)
        {
            aggregates[order.OrderType].Count++;
        }
    }

    public class Aggregate
    {
        public int Count { get; set; }
    }

    public enum OrderTypeEnum
    {
        Type1 = 1,
        Type2 = 2,
        Type3 = 3
    }
}

所以我使用 ConcurrentQueue 重写了应用程序。结果现在是正确的,但我觉得我做错了,或者可以更有效地完成。

/* improved version using ConcurrentQueue, showing correct results */

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Threading.Tasks;

namespace ConcurrentQueue
{
    class Program
    {
        private static readonly int OrderCount = 100;

        private static IEnumerable<Order> _orders;

        private static Dictionary<OrderTypeEnum, Aggregate> _aggregates;

        static void Main(string[] args)
        {
            //Prepare
            InitializeAggregates();
            _orders = CreateOrders();

            //Execute
            var proxy = new OrderProxy();
            var ordersQueue = new ConcurrentQueue<OrderResult>();
            Parallel.ForEach(_orders, order => {
                var orderResult = proxy.PlaceOrder(order);
                ordersQueue.Enqueue(orderResult);
            });

            foreach (var order in ordersQueue)
            {
                _aggregates[order.OrderType].Count++;
            }

            ShowOutput();
        }

        private static IEnumerable<Order> CreateOrders()
        {
            var orderList = new Collection<Order>();

            for (var i = 1; i <= OrderCount; i++)
            {
                var order = CreateOrder(i);
                orderList.Add(order);
            }

            return orderList;
        }

        private static void InitializeAggregates()
        {
            _aggregates = new Dictionary<OrderTypeEnum, Aggregate>();
            _aggregates[OrderTypeEnum.Type1] = new Aggregate();
            _aggregates[OrderTypeEnum.Type2] = new Aggregate();
            _aggregates[OrderTypeEnum.Type3] = new Aggregate();
        }

        private static Order CreateOrder(int id)
        {
            var order = new Order() { Id = id, AggregateType = GetRandomAggregtationType() };
            return order;
        }

        private static OrderTypeEnum GetRandomAggregtationType()
        {
            Array values = Enum.GetValues(typeof(OrderTypeEnum));
            var random = new Random();
            return (OrderTypeEnum)values.GetValue(random.Next(values.Length));
        }

        private static void ShowOutput()
        {
            Console.WriteLine($"Type: {OrderTypeEnum.Type1} Count: {_aggregates[OrderTypeEnum.Type1].Count}");
            Console.WriteLine($"Type: {OrderTypeEnum.Type2} Count: {_aggregates[OrderTypeEnum.Type2].Count}");
            Console.WriteLine($"Type: {OrderTypeEnum.Type3} Count: {_aggregates[OrderTypeEnum.Type3].Count}");
            var total =
                _aggregates[OrderTypeEnum.Type1].Count +
                _aggregates[OrderTypeEnum.Type2].Count +
                _aggregates[OrderTypeEnum.Type3].Count;
            Console.WriteLine($"Total for all types: {total}");
            Console.ReadKey();
        }
    }

    public class Order
    {
        public int Id { get; set; }
        public OrderTypeEnum AggregateType { get; set; }
    }

    public class OrderResult
    {
        public int Id { get; set; }
        public OrderTypeEnum OrderType { get; set; }
    }

    public class OrderProxy
    {
        public OrderResult PlaceOrder(Order order)
        {
            var orderResult = new OrderResult() { Id = order.Id, OrderType = order.AggregateType };
            return orderResult;
        }
    }

    public class Aggregate
    {
        public OrderTypeEnum OrderType { get; set; }
        public int Count { get; set; }
    }

    public enum OrderTypeEnum
    {
        Type1 = 1,
        Type2 = 2,
        Type3 = 3
    }
}

如您所见,我将 OrderResult 类型的对象添加到 ConcurrentQueue。我不需要使用 OrderResult 类。当然,我可以将订单添加到队列中,并在完成检索数据后遍历它们并计算总和。那是我应该做的吗?我只是想处理传入的订单,并立即计算不同类型的订单并将它们存储在我的“聚合集合”中。那可能吗?如果是,如何?

4

3 回答 3

1

正如大卫福勒本人所建议的那样,我尝试使用 System.Threading.Channels 来解决我的问题,并且我能够想出一些似乎可以正常工作的东西。

库 System.Threading.Channels 的文档记录很差,所以我希望我所做的是它应该做的方式。

using System;
using System.Threading.Tasks;
using System.Threading.Channels;
using System.Threading;
using System.Collections.Generic;

namespace ConcurrentQueue
{
    class Program
    {
        //Buffer for writing. After the capacity has been reached, a read must take place because the channel is full.
        private static readonly int Capacity = 10; 

        //Number of orders to write by each writer. (Choose 0 for infinitive.)
        private static readonly int NumberOfOrdersToWrite = 25;

        //Delay in ms used 
        private static readonly int Delay = 50;

        private static Dictionary<OrderTypeEnum, Aggregate> _aggregates;

        static void Main(string[] args)
        {
            //Prepare
            InitializeAggregates();

            MainAsync(args).GetAwaiter().GetResult();
        }

        static async Task MainAsync(string[] args)
        {
            var channel = Channel.CreateBounded<Order>(Capacity);

            var readerTask = Task.Run(() => ReadFromChannelAsync(channel.Reader));

            var writerTask01 = Task.Run(() => WriteToChannelAsync(channel.Writer, 1, NumberOfOrdersToWrite));
            var writerTask02 = Task.Run(() => WriteToChannelAsync(channel.Writer, 2, NumberOfOrdersToWrite));
            var writerTask03 = Task.Run(() => WriteToChannelAsync(channel.Writer, 3, NumberOfOrdersToWrite));
            var writerTask04 = Task.Run(() => WriteToChannelAsync(channel.Writer, 4, NumberOfOrdersToWrite));

            while (!writerTask01.IsCompleted || !writerTask02.IsCompleted || !writerTask03.IsCompleted || !writerTask04.IsCompleted)
            {
            }

            channel.Writer.Complete();

            await channel.Reader.Completion;

            ShowOutput();
        }

        public static async Task WriteToChannelAsync(ChannelWriter<Order> writer, int writerNumber, int numberOfOrdersToWrite = 0)
        {
            int i = 1;
            while (numberOfOrdersToWrite == 0 || i <= numberOfOrdersToWrite)
            {

                var order = CreateOrder(writerNumber, i);
                await writer.WriteAsync(order);
                Console.WriteLine($"Thread {Thread.CurrentThread.ManagedThreadId}: writer {writerNumber} just wrote order {order.OrderNumber} with value {order.OrderType}.");
                i++;
                //await Task.Delay(Delay);  //this simulates some work...
            }
        }

        private static async Task ReadFromChannelAsync(ChannelReader<Order> reader)
        {
            while (await reader.WaitToReadAsync())
            {
                while (reader.TryRead(out Order order))
                {
                    Console.WriteLine($"Thread {Thread.CurrentThread.ManagedThreadId}: reader just read order {order.OrderNumber} with value {order.OrderType}.");
                    _aggregates[order.OrderType].Count++;
                    await Task.Delay(Delay);  //this simulates some work...
                }
            }
        }

        private static void InitializeAggregates()
        {
            _aggregates = new Dictionary<OrderTypeEnum, Aggregate>();
            _aggregates[OrderTypeEnum.Type1] = new Aggregate();
            _aggregates[OrderTypeEnum.Type2] = new Aggregate();
            _aggregates[OrderTypeEnum.Type3] = new Aggregate();
        }

        private static Order CreateOrder(int writerNumber, int seq)
        {
            string orderNumber = $"{writerNumber}-{seq}";
            var order = new Order() { OrderNumber = orderNumber, OrderType = GetRandomOrderType() };
            return order;
        }

        private static OrderTypeEnum GetRandomOrderType()
        {
            Array values = Enum.GetValues(typeof(OrderTypeEnum));
            var random = new Random();
            return (OrderTypeEnum)values.GetValue(random.Next(values.Length));
        }

        private static void ShowOutput()
        {
            var total =
                _aggregates[OrderTypeEnum.Type1].Count +
                _aggregates[OrderTypeEnum.Type2].Count +
                _aggregates[OrderTypeEnum.Type3].Count;

            Console.WriteLine();
            Console.WriteLine($"Type: {OrderTypeEnum.Type1} Count: {_aggregates[OrderTypeEnum.Type1].Count}");
            Console.WriteLine($"Type: {OrderTypeEnum.Type2} Count: {_aggregates[OrderTypeEnum.Type2].Count}");
            Console.WriteLine($"Type: {OrderTypeEnum.Type3} Count: {_aggregates[OrderTypeEnum.Type3].Count}");
            Console.WriteLine($"Total for all types: {total}");
            Console.WriteLine();
            Console.WriteLine("Done! Press a key to close the window.");
            Console.ReadKey();
        }
    }

    public class Order
    {
        public string OrderNumber { get; set; }
        public OrderTypeEnum OrderType { get; set; }
    }

    public class Aggregator
    {
        public void Aggregate(Order order, Dictionary<OrderTypeEnum, Aggregate> aggregates)
        {
             aggregates[order.OrderType].Count++;
        }
    }

    public class Aggregate
    {
        public OrderTypeEnum OrderType { get; set; }
        public int Count { get; set; }
    }

    public enum OrderTypeEnum
    {
        Type1 = 1,
        Type2 = 2,
        Type3 = 3
    }
}

我不喜欢检查作者完成情况的方式。如何改善这一点?

while (!writerTask01.IsCompleted || !writerTask02.IsCompleted ||
       !writerTask03.IsCompleted || !writerTask04.IsCompleted)
{
}

任何反馈都非常感谢。

于 2018-07-24T13:02:08.413 回答
0

正如 Shayne 所建议的,使用 lock 语句(在我的第一个代码示例中)确实有效:

public class Aggregator
{
    private static readonly Object _lockObj = new Object();

    public void Aggregate(Order order, Dictionary<OrderTypeEnum, Aggregate> aggregates)
    {
        lock (_lockObj)
        {
            aggregates[order.OrderType].Count++;
        }
    }
}

我认为 DataFlow 和 System.Threading.Channels 是更灵活、更优雅的解决方案。

于 2018-07-24T11:52:43.710 回答
0

您使用的第二个解决方案ConcurrentQueue<T>实际上并没有同时进行聚合。它只是同时将项目添加到队列中,然后按顺序处理队列。对于这个特定的示例代码,最简单的解决方案是使用您想出的第一个解决方案,除了方法lock中的增量Aggregator.Aggregate,如下所示:

public class Aggregator
{
    public void Aggregate(Order order, Dictionary<OrderTypeEnum, Aggregate> aggregates)
    {
        var aggregate = aggregates[order.OrderType];
        Interlocked.Increment(ref aggregate.Count);
    }
}
于 2018-07-21T11:28:16.217 回答