我需要有一个将在线程池中执行操作的类,但这些操作应该排队。例如:
方法一 方法二 方法三
当有人从他的线程中调用方法 1 时,他也可以调用方法 2 或方法 3,并且所有 3 个方法都可以同时执行,但是当用户对方法 1 或 2 或 3 进行另一个调用时,这一次线程池应该阻塞这些调用,直到旧的执行完成。
如下图所示:
我应该使用渠道吗?
我需要有一个将在线程池中执行操作的类,但这些操作应该排队。例如:
方法一 方法二 方法三
当有人从他的线程中调用方法 1 时,他也可以调用方法 2 或方法 3,并且所有 3 个方法都可以同时执行,但是当用户对方法 1 或 2 或 3 进行另一个调用时,这一次线程池应该阻塞这些调用,直到旧的执行完成。
如下图所示:
我应该使用渠道吗?
To should i use channels?
,答案是肯定的,但也有其他可用的功能。
数据流
.NET 已经通过 TPL 数据流类提供了此功能。您可以使用ActionBlock类将消息(即数据)传递给在后台以保证顺序和可配置的并行度执行的工作方法。频道是一项新功能,其工作基本相同。
您所描述的实际上是使用 ActionBlock 的最简单方法 - 只需向其发布数据消息并让它一一处理:
void Method1(MyDataObject1 data){...}
var block=new ActionBlock<MyDataObject1>(Method1);
//Start sending data to the block
for(var msg in someListOfItems)
{
block.PostAsync(msg);
}
默认情况下,ActionBlock 有一个无限的输入队列。它将只使用一个任务来异步处理消息,按照它们发布的顺序。
完成后,您可以告诉它Complete()
并异步等待所有剩余项目完成处理:
block.Complete();
await block.Completion;
要处理不同的方法,您可以简单地使用多个块,例如:
var block1=new ActionBlock<MyDataObject1>(Method1);
var block2=new ActionBlock<MyDataObject1>(Method2);
频道
通道是比块更低级别的功能。这意味着您必须编写更多代码,但您可以更好地控制“处理块”的工作方式。事实上,您可能可以使用通道重写 TPL Dataflow 库。
您可以使用以下(有点天真)方法创建类似于 ActionBlock 的处理块:
ChannelWriter<TIn> Work(Action<TIn> action)
{
var channel=Channel.CreateUnbounded<TIn>();
var workerTask=Task.Run(async ()=>{
await foreach(var msg in channel.Reader.ReadAllAsync())
{
action(msg);
}
})
var writer=channel.Writer;
return writer;
}
该方法创建一个通道并在后台运行一个任务来异步读取数据并进行处理。我在这里通过使用“有点”作弊,await foreach
并且ChannelReader.ReadAllAsync()
在 C#8 和 .NET Core 3.0 中可用。
此方法可以像块一样使用:
ChannelWriter<DataObject1> writer1 = Work(Method1);
foreach(var msg in someListOfItems)
{
writer1.WriteAsync(msg);
}
writer1.Complete();
不过,频道还有很多。例如,SignalR 使用它们来允许将通知流式传输到客户端。
这是我的建议。对于每个同步方法,都应该添加一个异步方法。例如该方法FireTheGun
是同步的:
private static void FireTheGun(int bulletsCount)
{
var ratata = Enumerable.Repeat("Ta", bulletsCount).Prepend("Ra");
Console.WriteLine(String.Join("-", ratata));
}
异步对应FireTheGunAsync
非常简单,因为将同步操作排队的复杂性委托给了辅助方法QueueAsync
。
public static Task FireTheGunAsync(int bulletsCount)
{
return QueueAsync(FireTheGun, bulletsCount);
}
这里是QueueAsync
. 每个动作都有其专用的SemaphoreSlim
, 以防止多个并发执行:
private static ConcurrentDictionary<MethodInfo, SemaphoreSlim> semaphores =
new ConcurrentDictionary<MethodInfo, SemaphoreSlim>();
public static Task QueueAsync<T1>(Action<T1> action, T1 param1)
{
return Task.Run(async () =>
{
var semaphore = semaphores
.GetOrAdd(action.Method, key => new SemaphoreSlim(1));
await semaphore.WaitAsync();
try
{
action(param1);
}
finally
{
semaphore.Release();
}
});
}
使用示例:
FireTheGunAsync(5);
FireTheGunAsync(8);
输出:
拉塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔
塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔塔Ta
实现QueueAsync
具有不同数量参数的版本应该是微不足道的。
更新:我之前的实现QueueAsync
有可能是不受欢迎的行为,它以随机顺序执行操作。发生这种情况是因为第二个任务可能是第一个获取信号量的任务。下面是一个保证正确执行顺序的实现。在高竞争的情况下性能可能会很差,因为每个任务都进入一个循环,直到它以正确的顺序获取信号量。
private class QueueInfo
{
public SemaphoreSlim Semaphore = new SemaphoreSlim(1);
public int TicketToRide = 0;
public int Current = 0;
}
private static ConcurrentDictionary<MethodInfo, QueueInfo> queues =
new ConcurrentDictionary<MethodInfo, QueueInfo>();
public static Task QueueAsync<T1>(Action<T1> action, T1 param1)
{
var queue = queues.GetOrAdd(action.Method, key => new QueueInfo());
var ticket = Interlocked.Increment(ref queue.TicketToRide);
return Task.Run(async () =>
{
while (true) // Loop until our ticket becomes current
{
await queue.Semaphore.WaitAsync();
try
{
if (Interlocked.CompareExchange(ref queue.Current,
ticket, ticket - 1) == ticket - 1)
{
action(param1);
break;
}
}
finally
{
queue.Semaphore.Release();
}
}
});
}
这个解决方案怎么样?
public class ConcurrentQueue
{
private Dictionary<byte, PoolFiber> Actionsfiber;
public ConcurrentQueue()
{
Actionsfiber = new Dictionary<byte, PoolFiber>()
{
{ 1, new PoolFiber() },
{ 2, new PoolFiber() },
{ 3, new PoolFiber() },
};
foreach (var fiber in Actionsfiber.Values)
{
fiber.Start();
}
}
public void ExecuteAction(Action Action , byte Code)
{
if (Actionsfiber.ContainsKey(Code))
Actionsfiber[Code].Enqueue(() => { Action.Invoke(); });
else
Console.WriteLine($"invalid byte code");
}
}
public static void SomeAction1()
{
Console.WriteLine($"{DateTime.Now} Action 1 is working");
for (long i = 0; i < 2400000000; i++)
{
}
Console.WriteLine($"{DateTime.Now} Action 1 stopped");
}
public static void SomeAction2()
{
Console.WriteLine($"{DateTime.Now} Action 2 is working");
for (long i = 0; i < 5000000000; i++)
{
}
Console.WriteLine($"{DateTime.Now} Action 2 stopped");
}
public static void SomeAction3()
{
Console.WriteLine($"{DateTime.Now} Action 3 is working");
for (long i = 0; i < 5000000000; i++)
{
}
Console.WriteLine($"{DateTime.Now} Action 3 stopped");
}
public static void Main(string[] args)
{
ConcurrentQueue concurrentQueue = new ConcurrentQueue();
concurrentQueue.ExecuteAction(SomeAction1, 1);
concurrentQueue.ExecuteAction(SomeAction2, 2);
concurrentQueue.ExecuteAction(SomeAction3, 3);
concurrentQueue.ExecuteAction(SomeAction1, 1);
concurrentQueue.ExecuteAction(SomeAction2, 2);
concurrentQueue.ExecuteAction(SomeAction3, 3);
Console.WriteLine($"press any key to exit the program");
Console.ReadKey();
}
输出 :
2019 年 8 月 5 日上午 7:56:57 行动 1 正在运行
2019 年 8 月 5 日上午 7:56:57 行动 3 正在运行
2019 年 8 月 5 日上午 7:56:57 行动 2 正在运行
2019 年 8 月 5 日上午 7:57:08 动作 1 已停止
2019 年 8 月 5 日上午 7:57:08 行动 1 正在运行
2019 年 8 月 5 日上午 7:57:15 动作 2 停止
2019 年 8 月 5 日上午 7:57:15 行动 2 正在运行
2019 年 8 月 5 日上午 7:57:16 动作 3 停止
2019 年 8 月 5 日上午 7:57:16 行动 3 正在运行
2019 年 8 月 5 日上午 7:57:18 动作 1 停止
2019 年 8 月 5 日上午 7:57:33 动作 2 停止
2019 年 8 月 5 日上午 7:57:33 动作 3 停止
poolFiber 是 ExitGames.Concurrency.Fibers 命名空间中的一个类。更多信息 :