我有来自多个线程的订单,我想在一个线程中处理这些数据。如果我理解正确,那么可以使用 ConcurrentQueue。
我看了 SO question How to work threading with ConcurrentQueue<T>,但它没有回答我的问题。
我编写了一个小型测试应用程序(使用 .NET Core 2.1),看看是否可以让它工作。
这就是它应该做的:为 100 个订单进行聚合。3 种不同的订单类型有 3 种聚合:Type1、Type2 和 Type3
输出应该是这样的:
Type: Type1 Count: 38
Type: Type2 Count: 31
Type: Type3 Count: 31
Total for all types: 100
我开始编写没有 ConcurrentQueue 的应用程序。正如预期的那样,_aggregates 中的结果是错误的。
/* Incorrect version, not using ConcurrentQueue, showing incorrect results */
using System;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Threading.Tasks;
namespace ConcurrentQueue
{
class Program
{
private static readonly int OrderCount = 100;
private static IEnumerable<Order> _orders;
private static Dictionary<OrderTypeEnum, Aggregate> _aggregates;
static void Main(string[] args)
{
//Prepare
InitializeAggregates();
_orders = CreateOrders();
//Execute
MainAsync(args).GetAwaiter().GetResult();
}
static async Task MainAsync(string[] args)
{
await Task.Run(() => ProcessOrders());
ShowOutput();
}
public static void ProcessOrders()
{
var aggregator = new Aggregator();
Parallel.ForEach(_orders, order => {
aggregator.Aggregate(order, _aggregates);
});
}
private static IEnumerable<Order> CreateOrders()
{
var orderList = new Collection<Order>();
for (var i = 1; i <= OrderCount; i++)
{
var order = CreateOrder(i);
orderList.Add(order);
}
return orderList;
}
private static void InitializeAggregates()
{
_aggregates = new Dictionary<OrderTypeEnum, Aggregate>();
_aggregates[OrderTypeEnum.Type1] = new Aggregate();
_aggregates[OrderTypeEnum.Type2] = new Aggregate();
_aggregates[OrderTypeEnum.Type3] = new Aggregate();
}
private static Order CreateOrder(int id)
{
var order = new Order() { Id = id, OrderType = GetRandomAggregtationType() };
return order;
}
private static OrderTypeEnum GetRandomAggregtationType()
{
Array values = Enum.GetValues(typeof(OrderTypeEnum));
var random = new Random();
return (OrderTypeEnum)values.GetValue(random.Next(values.Length));
}
private static void ShowOutput()
{
Console.WriteLine($"Type: {OrderTypeEnum.Type1} Count: {_aggregates[OrderTypeEnum.Type1].Count}");
Console.WriteLine($"Type: {OrderTypeEnum.Type2} Count: {_aggregates[OrderTypeEnum.Type2].Count}");
Console.WriteLine($"Type: {OrderTypeEnum.Type3} Count: {_aggregates[OrderTypeEnum.Type3].Count}");
var total =
_aggregates[OrderTypeEnum.Type1].Count +
_aggregates[OrderTypeEnum.Type2].Count +
_aggregates[OrderTypeEnum.Type3].Count;
Console.WriteLine($"Total for all types: {total}");
Console.ReadKey();
}
}
public class Order
{
public int Id { get; set; }
public OrderTypeEnum OrderType { get; set; }
}
public class Aggregator
{
public void Aggregate(Order order, Dictionary<OrderTypeEnum, Aggregate> aggregates)
{
aggregates[order.OrderType].Count++;
}
}
public class Aggregate
{
public int Count { get; set; }
}
public enum OrderTypeEnum
{
Type1 = 1,
Type2 = 2,
Type3 = 3
}
}
所以我使用 ConcurrentQueue 重写了应用程序。结果现在是正确的,但我觉得我做错了,或者可以更有效地完成。
/* improved version using ConcurrentQueue, showing correct results */
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Threading.Tasks;
namespace ConcurrentQueue
{
class Program
{
private static readonly int OrderCount = 100;
private static IEnumerable<Order> _orders;
private static Dictionary<OrderTypeEnum, Aggregate> _aggregates;
static void Main(string[] args)
{
//Prepare
InitializeAggregates();
_orders = CreateOrders();
//Execute
var proxy = new OrderProxy();
var ordersQueue = new ConcurrentQueue<OrderResult>();
Parallel.ForEach(_orders, order => {
var orderResult = proxy.PlaceOrder(order);
ordersQueue.Enqueue(orderResult);
});
foreach (var order in ordersQueue)
{
_aggregates[order.OrderType].Count++;
}
ShowOutput();
}
private static IEnumerable<Order> CreateOrders()
{
var orderList = new Collection<Order>();
for (var i = 1; i <= OrderCount; i++)
{
var order = CreateOrder(i);
orderList.Add(order);
}
return orderList;
}
private static void InitializeAggregates()
{
_aggregates = new Dictionary<OrderTypeEnum, Aggregate>();
_aggregates[OrderTypeEnum.Type1] = new Aggregate();
_aggregates[OrderTypeEnum.Type2] = new Aggregate();
_aggregates[OrderTypeEnum.Type3] = new Aggregate();
}
private static Order CreateOrder(int id)
{
var order = new Order() { Id = id, AggregateType = GetRandomAggregtationType() };
return order;
}
private static OrderTypeEnum GetRandomAggregtationType()
{
Array values = Enum.GetValues(typeof(OrderTypeEnum));
var random = new Random();
return (OrderTypeEnum)values.GetValue(random.Next(values.Length));
}
private static void ShowOutput()
{
Console.WriteLine($"Type: {OrderTypeEnum.Type1} Count: {_aggregates[OrderTypeEnum.Type1].Count}");
Console.WriteLine($"Type: {OrderTypeEnum.Type2} Count: {_aggregates[OrderTypeEnum.Type2].Count}");
Console.WriteLine($"Type: {OrderTypeEnum.Type3} Count: {_aggregates[OrderTypeEnum.Type3].Count}");
var total =
_aggregates[OrderTypeEnum.Type1].Count +
_aggregates[OrderTypeEnum.Type2].Count +
_aggregates[OrderTypeEnum.Type3].Count;
Console.WriteLine($"Total for all types: {total}");
Console.ReadKey();
}
}
public class Order
{
public int Id { get; set; }
public OrderTypeEnum AggregateType { get; set; }
}
public class OrderResult
{
public int Id { get; set; }
public OrderTypeEnum OrderType { get; set; }
}
public class OrderProxy
{
public OrderResult PlaceOrder(Order order)
{
var orderResult = new OrderResult() { Id = order.Id, OrderType = order.AggregateType };
return orderResult;
}
}
public class Aggregate
{
public OrderTypeEnum OrderType { get; set; }
public int Count { get; set; }
}
public enum OrderTypeEnum
{
Type1 = 1,
Type2 = 2,
Type3 = 3
}
}
如您所见,我将 OrderResult 类型的对象添加到 ConcurrentQueue。我不需要使用 OrderResult 类。当然,我可以将订单添加到队列中,并在完成检索数据后遍历它们并计算总和。那是我应该做的吗?我只是想处理传入的订单,并立即计算不同类型的订单并将它们存储在我的“聚合集合”中。那可能吗?如果是,如何?