2

完整的可重现代码在 github 上,启动可执行文件后内存很快就会飙升。代码主要位于AsyncBlockingQueue.cs类中。

以下代码实现了一个简单的异步“阻塞”队列:

        public async Task<T> DequeueAsync(
            int timeoutInMs = -1,
            CancellationToken cancellationToken = default)
        {
            try
            {
                using (CancellationTokenSource cts = this.GetCancellationTokenSource(timeoutInMs, cancellationToken))
                {
                    T value = await this._channel.Reader.ReadAsync(cts?.Token ?? cancellationToken).ConfigureAwait(false);
                    return value;
                }
            }
            catch (ChannelClosedException cce)
            {
                await Console.Error.WriteLineAsync("Channel is closed.");
                throw new ObjectDisposedException("Queue is disposed");
            }
            catch (OperationCanceledException)
            {
                throw;
            }
            catch (Exception ex)
            {
                await Console.Error.WriteLineAsync("Dequeue failed.");
                throw;
            }
        }


        private CancellationTokenSource GetCancellationTokenSource(
            int timeoutInMs,
            CancellationToken cancellationToken)
        {
            if (timeoutInMs <= 0)
            {
                return null;
            }

            CancellationTokenSource cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
            cts.CancelAfter(TimeSpan.FromMilliseconds(timeoutInMs));
            return cts;
        }

以这种方式使用时,会出现内存泄漏:

try
{
   string message = await this._inputQueue.DequeueAsync(10,cancellationToken).ConfigureAwait(false);
}
catch(OperationCanceledException){
   // timeout 
}

在此处输入图像描述

4

3 回答 3

1

Update

From the comments :

there is a processor to process the messages in batch. it starts to process when there are enough messages or time is up, that's where the timeout cancellation comes up

This means that what's really needed is a way to batch messages by both count and period. Doing either is relatively easy .

This method batches by count. The method adds messages to the batch list until the limit is reached, sends the data downstream and clears the list :

static ChannelReader<Message[]> BatchByCount(this ChannelReader<Message> input, int count, CancellationToken token=default)
{
    var channel=Channel.CreateUnbounded();
    var writer=channel.Writer;   

    _ = Task.Run(async ()=>{
        var batch=new List<Message>(count);
        await foreach(var msg in input.ReadAllAsync(token))
        {
            batch.Add(msg);
            if(batch.Count==count)
            {
                await writer.WriteAsync(batch.ToArray());
                batch.Clear();
            }
        }
    },token)
   .ContinueWith(t=>writer.TryComplete(t.Exception));
   return channel;
}

A method that batches by period is more complicated, as the timer can fire at the same time a message is received. Interlocked.Exchange replaces the existing batch list with a new one and sends the batched data downstream. :

static ChannelReader<Message[]> BatchByPeriod(this ChannelReader<Message> input, TimeSpan period, CancellationToken token=default)
{
    var channel=Channel.CreateUnbounded();
    var writer=channel.Writer;   

    var batch=new List<Message>();
    Timer t=new Timer(async obj =>{
        var data=Interlocked.Exchange(ref batch,new List<Message>());
        writer.WriteAsync(data.ToArray());
    },null,TimeSpan.Zero,period);

    _ = Task.Run(async ()=>{
        
        await foreach(var msg in input.ReadAllAsync(token))
        {
            batch.Add(msg);
        }
    },token)
   .ContinueWith(t=>{
        timer.Dispose();
        writer.TryComplete(t.Exception);
   });
   return channel;
}

To do both - I'm still working on that. The problem is that both the count and timer expiration can occur at the same time. Worst case, lock(batch) can be used to ensure only the thread or loop can send the data downstream

Original Answer

Channels don't leak when used properly - just like any other container. A Channel isn't an asynchronous queue and definitely not a blocking one. It's a very different construct, with completely different idioms. It's a higher-level container that uses queues. There's a very good reason there are separate ChannelReader and ChannelWriter classes.

The typical scenario is to have a publisher create and own the channel. Only the publisher can write to that channel and call Complete() on it. Channel doesn't implement IDisposable so it can't be disposed. The publisher only provides a ChannelReader to subscribers.

Subscribers only see a ChannelReader and read from it until it completes. By using ReadAllAsync a subscriber can keep reading from a ChannelReader until it completes.

This is a typical example :

ChannelReader<Message> Producer(CancellationToken token=default)
{
    var channel=Channel.CreateUnbounded<Message>();
    var writer=channel.Writer;

    //Create the actual "publisher" worker
    _ = Task.Run(async ()=>{
        for(int i=0;i<100;i++)
        {
            //Check for cancellation
            if(token.IsCancellationRequested)
            {
                return;
            }
            //Simulate some work
            await Task.Delay(100);
            await writer.WriteAsync(new Message(...));          
        }
    }  ,token)
    //Complete and propagate any exceptions
    .ContinueWith(t=>writer.TryComplete(t.Exception));

    //This casts to a ChannelReader
    return channel;
}

The subscriber only needs a ChannelReader to work. By using ChannelReader.ReadAllAsync the subscriber only needs await foreach to process messages:

async Task Subscriber(ChannelReader<Message> input,CancellationToken token=default)
{
    await foreach(var msg in input.ReadAllAsync(token))
    {
        //Use the message
    }
}

The subscriber can produce its own messages by returning a ChannelReader. And this is where things become very interesting, as the Subscriber method becomes a step in a pipeline of chained steps. If we convert the methods to extension methods on ChannelReader we can easily create an entire pipeline.

Let's generate some numbers :

ChannelReader<int> Generate(int nums,CancellationToken token=default)
{
    var channel=Channel.CreateBounded<int>(10);
    var writer=channel.Writer;

    //Create the actual "publisher" worker
    _ = Task.Run(async ()=>{
        for(int i=0;i<nums;i++)
        {
            //Check for cancellation
            if(token.IsCancellationRequested)
            {
                return;
            }

            await writer.WriteAsync(i*7);  
            await Task.Delay(100);        
        }
    }  ,token)
    //Complete and propagate any exceptions
    .ContinueWith(t=>writer.TryComplete(t.Exception));

    //This casts to a ChannelReader
    return channel;
}

Then double and square them :

ChannelReader<double> Double(this ChannelReader<int> input,CancellationToken token=default)
{
    var channel=Channel.CreateBounded<double>(10);
    var writer=channel.Writer;

    //Create the actual "publisher" worker
    _ = Task.Run(async ()=>{
        await foreach(var msg in input.ReadAllAsync(token))
        {
            await writer.WriteAsync(2.0*msg);          
        }
    }  ,token)
    //Complete and propagate any exceptions
    .ContinueWith(t=>writer.TryComplete(t.Exception));

    return channel;
}

ChannelReader<double> Root(this ChannelReader<double> input,CancellationToken token=default)
{
    var channel=Channel.CreateBounded<double>(10);
    var writer=channel.Writer;

    //Create the actual "publisher" worker
    _ = Task.Run(async ()=>{
        await foreach(var msg in input.ReadAllAsync(token))
        {
            await writer.WriteAsync(Math.Sqrt(msg));          
        }
    }  ,token)
    //Complete and propagate any exceptions
    .ContinueWith(t=>writer.TryComplete(t.Exception));

    return channel;
}

And finally print them

async Task Print(this ChannelReader<double> input,CancellationToken token=default)
{
    await foreach(var msg in input.ReadAllAsync(token))
    {
        Console.WriteLine(msg);
    }
}

Now we can build a pipeline


await Generate(100)
          .Double()
          .Square()
          .Print();

And add a cancellation token to all steps :

using var cts=new CancellationTokenSource();
await Generate(100,cts.Token)
          .Double(cts.Token)
          .Square(cts.Token)
          .Print(cts.Token);

Memory usage can increase if one step produces messages faster than they're consumed for a long time. This is easily handled by using a bounded instead of an unbounded channel. This way, if a method is too slow all the previous methods will have to await before publishing new data.

于 2021-05-19T17:03:50.253 回答
0

我能够重现您正在观察的问题。恕我直言,这显然是Channels库中的一个缺陷。这是我的复制品:

using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

public static class Program
{
    public static async Task Main()
    {
        var channel = Channel.CreateUnbounded<int>();
        var bufferBlock = new BufferBlock<int>();
        var asyncCollection = new Nito.AsyncEx.AsyncCollection<int>();
        var mem0 = GC.GetTotalMemory(true);
        int timeouts = 0;
        for (int i = 0; i < 10; i++)
        {
            var stopwatch = Stopwatch.StartNew();
            while (stopwatch.ElapsedMilliseconds < 500)
            {
                using var cts = new CancellationTokenSource(1);
                try
                {
                    await channel.Reader.ReadAsync(cts.Token);
                    //await bufferBlock.ReceiveAsync(cts.Token);
                    //await asyncCollection.TakeAsync(cts.Token);
                }
                catch (OperationCanceledException) { timeouts++; }
            }
            var mem1 = GC.GetTotalMemory(true);
            Console.WriteLine($"{i + 1,2}) Timeouts: {timeouts,5:#,0},"
                + $" Allocated: {mem1 - mem0:#,0} bytes");
        }
    }
}

输出:

 1) Timeouts:   124, Allocated: 175,664 bytes
 2) Timeouts:   250, Allocated: 269,720 bytes
 3) Timeouts:   376, Allocated: 362,544 bytes
 4) Timeouts:   502, Allocated: 453,264 bytes
 5) Timeouts:   628, Allocated: 548,080 bytes
 6) Timeouts:   754, Allocated: 638,800 bytes
 7) Timeouts:   880, Allocated: 729,584 bytes
 8) Timeouts: 1,006, Allocated: 820,304 bytes
 9) Timeouts: 1,132, Allocated: 919,216 bytes
10) Timeouts: 1,258, Allocated: 1,011,928 bytes

在小提琴上试试。

每次操作泄漏大约 800 个字节,这非常令人讨厌。每次在通道中写入新值时都会回收内存,因此对于繁忙的通道来说,这个设计缺陷应该不是问题。但是对于偶尔接收价值的渠道来说,这可能是一个阻碍。

还有其他可用的异步队列实现,它们不会遇到同样的问题。您可以尝试注释该await channel.Reader.ReadAsync(cts.Token);行并取消注释以下两行中的任何一行。您将看到BufferBlock<T>来自TPL Dataflow库和AsyncCollection<T>来自Nito.AsyncEx.Coordination包的 都允许从队列中异步检索超时,而不会发生内存泄漏。

于 2021-05-20T08:53:08.770 回答
0

我全神贯注于实际问题的技术细节,我忘记了问题已经几乎开箱即用地解决了。

从评论看来,实际问题是:

有一个处理器可以批量处理消息。当有足够的消息或时间到时它开始处理,这就是超时取消出现的地方

这是由ReactiveX.NET的Buffer运算符提供的,该运算符由创建System.Linq.Async的同一团队构建 :

ChannelReader<Message> reader=_channel;

IAsyncEnumerable<IList<Message>> batchItems = reader.ReadAllAsync(token)
                                              .ToObservable()
                                              .Buffer(TimeSpan.FromSeconds(30), 5)
                                              .ToAsyncEnumerable();

await foreach(var batch in batchItems.WithCancellation(token))
{
 ....
}

这些调用可以转换为扩展方法,因此DequeueAsync问题的类可以有一个BufferAsyncorGetWorkItemsAsync方法,而不是 a :

public IAsyncEnumerable<T[]> BufferAsync(
            TimeSpan timeSpan,
            int count,
            CancellationToken cancellationToken = default)
{
    return _channel.Reader.BufferAsync(timeSpan,count,cancellationToken);
}

ToObservableToAsyncEnumerableSystem.Linq.Async提供,并在ReactiveX.NET 使用的接口IAsyncEnumerable和之间进行转换。IObservable

缓冲区System.Reactive提供,并按计数或周期逐项缓冲,甚至允许重叠序列。

虽然 LINQ 和 LINQ to Async 为对象提供查询运算符,但 Rx.NET 对基于时间的事件流执行相同的操作。可以随时间聚合,按时间缓冲事件,限制它们等。缓冲区(非官方)文档页面中的示例显示了如何创建重叠序列(例如滑动窗口)。同一页面显示了如何SampleThrottle可用于通过仅传播一段时间内的最后一个事件来限制快速事件流。

Rx 使用推送模型(新事件被推送给订阅者),而 IAsyncEnumerable 和 IEnumerable 一样,使用拉取模型。ToAsyncEnumerable()将缓存项目直到它们被请求,如果没有人在听,这可能会导致问题。

使用这些方法,甚至可以创建扩展方法来缓冲或限制发布者:

    //Returns all items in a period
    public static IAsyncEnumerable<IList<T>> BufferAsync<T>(
        this ChannelReader<T> reader, 
        TimeSpan timeSpan, 
        int count,
        CancellationToken token = default)
    {
        return reader.ReadAllAsync(token)
            .ToObservable()
            .Buffer(timeSpan, count)
            .ToAsyncEnumerable();
    }
        
        
    //Return the latest item in a period
    public static IAsyncEnumerable<T> SampleAsync<T>(
        this ChannelReader<T> reader, 
        TimeSpan timeSpan,
        CancellationToken token = default)
    {
        return reader.ReadAllAsync(token)
            .ToObservable()
            .Sample(timeSpan)
            .ToAsyncEnumerable();
    }
于 2021-05-24T08:40:54.947 回答