我需要以 FIFO 方式处理来自生产者的数据,如果同一生产者产生新的数据位,则能够中止处理。
因此,我基于 Stephen Cleary 的AsyncCollection
(AsyncCollectionAbortableFifoQueue
在我的示例中调用)和 TPL 的BufferBlock
(BufferBlockAbortableAsyncFifoQueue
在我的示例中)实现了一个可中止的 FIFO 队列。这是基于的实现AsyncCollection
public class AsyncCollectionAbortableFifoQueue<T> : IExecutableAsyncFifoQueue<T>
{
private AsyncCollection<AsyncWorkItem<T>> taskQueue = new AsyncCollection<AsyncWorkItem<T>>();
private readonly CancellationToken stopProcessingToken;
public AsyncCollectionAbortableFifoQueue(CancellationToken cancelToken)
{
stopProcessingToken = cancelToken;
_ = processQueuedItems();
}
public Task<T> EnqueueTask(Func<Task<T>> action, CancellationToken? cancelToken)
{
var tcs = new TaskCompletionSource<T>();
var item = new AsyncWorkItem<T>(tcs, action, cancelToken);
taskQueue.Add(item);
return tcs.Task;
}
protected virtual async Task processQueuedItems()
{
while (!stopProcessingToken.IsCancellationRequested)
{
try
{
var item = await taskQueue.TakeAsync(stopProcessingToken).ConfigureAwait(false);
if (item.CancelToken.HasValue && item.CancelToken.Value.IsCancellationRequested)
item.TaskSource.SetCanceled();
else
{
try
{
T result = await item.Action().ConfigureAwait(false);
item.TaskSource.SetResult(result); // Indicate completion
}
catch (Exception ex)
{
if (ex is OperationCanceledException && ((OperationCanceledException)ex).CancellationToken == item.CancelToken)
item.TaskSource.SetCanceled();
item.TaskSource.SetException(ex);
}
}
}
catch (Exception) { }
}
}
}
public interface IExecutableAsyncFifoQueue<T>
{
Task<T> EnqueueTask(Func<Task<T>> action, CancellationToken? cancelToken);
}
processQueuedItems
是从队列中出列的任务AsyncWorkItem
,并执行它们,除非已请求取消。
要执行的异步操作被包装成一个AsyncWorkItem
看起来像这样的
internal class AsyncWorkItem<T>
{
public readonly TaskCompletionSource<T> TaskSource;
public readonly Func<Task<T>> Action;
public readonly CancellationToken? CancelToken;
public AsyncWorkItem(TaskCompletionSource<T> taskSource, Func<Task<T>> action, CancellationToken? cancelToken)
{
TaskSource = taskSource;
Action = action;
CancelToken = cancelToken;
}
}
然后有一个任务查找和出列项目以进行处理,或者处理它们,或者如果CancellationToken
已触发则中止。
一切正常 - 数据得到处理,如果接收到新数据,旧数据的处理将中止。我的问题现在源于这些队列泄漏大量内存,如果我提高使用率(生产者生产的比消费者进程多得多)。鉴于它是可中止的,未处理的数据应该被丢弃并最终从内存中消失。
那么让我们看看我是如何使用这些队列的。我有生产者和消费者的 1:1 匹配。每个消费者处理单个生产者的数据。每当我得到一个新的数据项,并且它与前一个不匹配时,我都会为给定的生产者(User.UserId)捕获队列或创建一个新的(代码片段中的“执行者”)。然后我有ConcurrentDictionary
一个CancellationTokenSource
每个生产者/消费者组合。如果有 previous CancellationTokenSource
,我Cancel
会在 20 秒后调用它Dispose
(立即处理会导致队列中出现异常)。然后我将新数据的处理排入队列。队列返回给我一个我可以等待的任务,以便我知道数据处理何时完成,然后我返回结果。
这是代码中的
internal class SimpleLeakyConsumer
{
private ConcurrentDictionary<string, IExecutableAsyncFifoQueue<bool>> groupStateChangeExecutors = new ConcurrentDictionary<string, IExecutableAsyncFifoQueue<bool>>();
private readonly ConcurrentDictionary<string, CancellationTokenSource> userStateChangeAborters = new ConcurrentDictionary<string, CancellationTokenSource>();
protected CancellationTokenSource serverShutDownSource;
private readonly int operationDuration = 1000;
internal SimpleLeakyConsumer(CancellationTokenSource serverShutDownSource, int operationDuration)
{
this.serverShutDownSource = serverShutDownSource;
this.operationDuration = operationDuration * 1000; // convert from seconds to milliseconds
}
internal async Task<bool> ProcessStateChange(string userId)
{
var executor = groupStateChangeExecutors.GetOrAdd(userId, new AsyncCollectionAbortableFifoQueue<bool>(serverShutDownSource.Token));
CancellationTokenSource oldSource = null;
using (var cancelSource = userStateChangeAborters.AddOrUpdate(userId, new CancellationTokenSource(), (key, existingValue) =>
{
oldSource = existingValue;
return new CancellationTokenSource();
}))
{
if (oldSource != null && !oldSource.IsCancellationRequested)
{
oldSource.Cancel();
_ = delayedDispose(oldSource);
}
try
{
var executionTask = executor.EnqueueTask(async () => { await Task.Delay(operationDuration, cancelSource.Token).ConfigureAwait(false); return true; }, cancelSource.Token);
var result = await executionTask.ConfigureAwait(false);
userStateChangeAborters.TryRemove(userId, out var aborter);
return result;
}
catch (Exception e)
{
if (e is TaskCanceledException || e is OperationCanceledException)
return true;
else
{
userStateChangeAborters.TryRemove(userId, out var aborter);
return false;
}
}
}
}
private async Task delayedDispose(CancellationTokenSource src)
{
try
{
await Task.Delay(20 * 1000).ConfigureAwait(false);
}
finally
{
try
{
src.Dispose();
}
catch (ObjectDisposedException) { }
}
}
}
在这个示例实现中,所做的只是等待,然后返回 true。
为了测试这种机制,我编写了以下数据生产者类:
internal class SimpleProducer
{
//variables defining the test
readonly int nbOfusers = 10;
readonly int minimumDelayBetweenTest = 1; // seconds
readonly int maximumDelayBetweenTests = 6; // seconds
readonly int operationDuration = 3; // number of seconds an operation takes in the tester
private readonly Random rand;
private List<User> users;
private readonly SimpleLeakyConsumer consumer;
protected CancellationTokenSource serverShutDownSource, testAbortSource;
private CancellationToken internalToken = CancellationToken.None;
internal SimpleProducer()
{
rand = new Random();
testAbortSource = new CancellationTokenSource();
serverShutDownSource = new CancellationTokenSource();
generateTestObjects(nbOfusers, 0, false);
consumer = new SimpleLeakyConsumer(serverShutDownSource, operationDuration);
}
internal void StartTests()
{
if (internalToken == CancellationToken.None || internalToken.IsCancellationRequested)
{
internalToken = testAbortSource.Token;
foreach (var user in users)
_ = setNewUserPresence(internalToken, user);
}
}
internal void StopTests()
{
testAbortSource.Cancel();
try
{
testAbortSource.Dispose();
}
catch (ObjectDisposedException) { }
testAbortSource = new CancellationTokenSource();
}
internal void Shutdown()
{
serverShutDownSource.Cancel();
}
private async Task setNewUserPresence(CancellationToken token, User user)
{
while (!token.IsCancellationRequested)
{
var nextInterval = rand.Next(minimumDelayBetweenTest, maximumDelayBetweenTests);
try
{
await Task.Delay(nextInterval * 1000, testAbortSource.Token).ConfigureAwait(false);
}
catch (TaskCanceledException)
{
break;
}
//now randomly generate a new state and submit it to the tester class
UserState? status;
var nbStates = Enum.GetValues(typeof(UserState)).Length;
if (user.CurrentStatus == null)
{
var newInt = rand.Next(nbStates);
status = (UserState)newInt;
}
else
{
do
{
var newInt = rand.Next(nbStates);
status = (UserState)newInt;
}
while (status == user.CurrentStatus);
}
_ = sendUserStatus(user, status.Value);
}
}
private async Task sendUserStatus(User user, UserState status)
{
await consumer.ProcessStateChange(user.UserId).ConfigureAwait(false);
}
private void generateTestObjects(int nbUsers, int nbTeams, bool addAllUsersToTeams = false)
{
users = new List<User>();
for (int i = 0; i < nbUsers; i++)
{
var usr = new User
{
UserId = $"User_{i}",
Groups = new List<Team>()
};
users.Add(usr);
}
}
}
它使用类开头的变量来控制测试。您可以定义用户数量(nbOfusers
- 每个用户都是产生新数据的生产者)、用户产生下一个数据之间的最小 ( minimumDelayBetweenTest
) 和最大 ( ) 延迟以及消费者处理数据所需的时间 ( )。maximumDelayBetweenTests
operationDuration
StartTests
开始实际测试,然后StopTests
再次停止测试。
我这样称呼这些
static void Main(string[] args)
{
var tester = new SimpleProducer();
Console.WriteLine("Test successfully started, type exit to stop");
string str;
do
{
str = Console.ReadLine();
if (str == "start")
tester.StartTests();
else if (str == "stop")
tester.StopTests();
}
while (str != "exit");
tester.Shutdown();
}
因此,如果我运行我的测试器并键入“开始”,则Producer
该类开始生成由Consumer
. 并且内存使用量开始增长和增长。该示例配置到极端,我正在处理的实际场景不那么密集,但是生产者的一个动作可能会触发消费者端的多个动作,这些动作也必须以相同的异步可中止 fifo 方式执行 -所以最坏的情况是,生成的一组数据会触发约 10 个消费者的操作(为简洁起见,我删除了最后一部分)。
当我有 100 个生产者时,每个生产者每 1-6 秒产生一个新数据项(随机产生的数据也是随机的)。消耗数据需要 3 秒。所以在很多情况下,在旧数据被正确处理之前就有一组新数据。
查看两个连续的内存转储,很明显内存使用量来自哪里......所有与队列有关的片段。鉴于我正在处理每个 TaskCancellationSource 并且没有保留对生成数据的任何引用(以及AsyncWorkItem
它们被放入的数据),我无法解释为什么这会一直占用我的内存,我希望其他人可以告诉我我的方式的错误。你也可以通过输入'stop'来中止测试。你会看到内存不再被吃掉,但即使你暂停并触发GC,内存也没有被释放。
可运行形式的项目源代码在Github上。启动后,你必须start
在控制台中输入(加回车)来告诉生产者开始生产数据。您可以通过键入stop
(加回车)来停止生成数据