Update
From the comments :
there is a processor to process the messages in batch. it starts to process when there are enough messages or time is up, that's where the timeout cancellation comes up
This means that what's really needed is a way to batch messages by both count and period. Doing either is relatively easy .
This method batches by count. The method adds messages to the batch
list until the limit is reached, sends the data downstream and clears the list :
static ChannelReader<Message[]> BatchByCount(this ChannelReader<Message> input, int count, CancellationToken token=default)
{
var channel=Channel.CreateUnbounded();
var writer=channel.Writer;
_ = Task.Run(async ()=>{
var batch=new List<Message>(count);
await foreach(var msg in input.ReadAllAsync(token))
{
batch.Add(msg);
if(batch.Count==count)
{
await writer.WriteAsync(batch.ToArray());
batch.Clear();
}
}
},token)
.ContinueWith(t=>writer.TryComplete(t.Exception));
return channel;
}
A method that batches by period is more complicated, as the timer can fire at the same time a message is received. Interlocked.Exchange
replaces the existing batch
list with a new one and sends the batched data downstream. :
static ChannelReader<Message[]> BatchByPeriod(this ChannelReader<Message> input, TimeSpan period, CancellationToken token=default)
{
var channel=Channel.CreateUnbounded();
var writer=channel.Writer;
var batch=new List<Message>();
Timer t=new Timer(async obj =>{
var data=Interlocked.Exchange(ref batch,new List<Message>());
writer.WriteAsync(data.ToArray());
},null,TimeSpan.Zero,period);
_ = Task.Run(async ()=>{
await foreach(var msg in input.ReadAllAsync(token))
{
batch.Add(msg);
}
},token)
.ContinueWith(t=>{
timer.Dispose();
writer.TryComplete(t.Exception);
});
return channel;
}
To do both - I'm still working on that. The problem is that both the count and timer expiration can occur at the same time. Worst case, lock(batch)
can be used to ensure only the thread or loop can send the data downstream
Original Answer
Channels don't leak when used properly - just like any other container. A Channel isn't an asynchronous queue and definitely not a blocking one. It's a very different construct, with completely different idioms. It's a higher-level container that uses queues. There's a very good reason there are separate ChannelReader and ChannelWriter classes.
The typical scenario is to have a publisher create and own the channel. Only the publisher can write to that channel and call Complete()
on it. Channel
doesn't implement IDisposable
so it can't be disposed. The publisher only provides a ChannelReader
to subscribers.
Subscribers only see a ChannelReader
and read from it until it completes. By using ReadAllAsync
a subscriber can keep reading from a ChannelReader until it completes.
This is a typical example :
ChannelReader<Message> Producer(CancellationToken token=default)
{
var channel=Channel.CreateUnbounded<Message>();
var writer=channel.Writer;
//Create the actual "publisher" worker
_ = Task.Run(async ()=>{
for(int i=0;i<100;i++)
{
//Check for cancellation
if(token.IsCancellationRequested)
{
return;
}
//Simulate some work
await Task.Delay(100);
await writer.WriteAsync(new Message(...));
}
} ,token)
//Complete and propagate any exceptions
.ContinueWith(t=>writer.TryComplete(t.Exception));
//This casts to a ChannelReader
return channel;
}
The subscriber only needs a ChannelReader
to work. By using ChannelReader.ReadAllAsync the subscriber only needs await foreach
to process messages:
async Task Subscriber(ChannelReader<Message> input,CancellationToken token=default)
{
await foreach(var msg in input.ReadAllAsync(token))
{
//Use the message
}
}
The subscriber can produce its own messages by returning a ChannelReader. And this is where things become very interesting, as the Subscriber
method becomes a step in a pipeline of chained steps. If we convert the methods to extension methods on ChannelReader
we can easily create an entire pipeline.
Let's generate some numbers :
ChannelReader<int> Generate(int nums,CancellationToken token=default)
{
var channel=Channel.CreateBounded<int>(10);
var writer=channel.Writer;
//Create the actual "publisher" worker
_ = Task.Run(async ()=>{
for(int i=0;i<nums;i++)
{
//Check for cancellation
if(token.IsCancellationRequested)
{
return;
}
await writer.WriteAsync(i*7);
await Task.Delay(100);
}
} ,token)
//Complete and propagate any exceptions
.ContinueWith(t=>writer.TryComplete(t.Exception));
//This casts to a ChannelReader
return channel;
}
Then double and square them :
ChannelReader<double> Double(this ChannelReader<int> input,CancellationToken token=default)
{
var channel=Channel.CreateBounded<double>(10);
var writer=channel.Writer;
//Create the actual "publisher" worker
_ = Task.Run(async ()=>{
await foreach(var msg in input.ReadAllAsync(token))
{
await writer.WriteAsync(2.0*msg);
}
} ,token)
//Complete and propagate any exceptions
.ContinueWith(t=>writer.TryComplete(t.Exception));
return channel;
}
ChannelReader<double> Root(this ChannelReader<double> input,CancellationToken token=default)
{
var channel=Channel.CreateBounded<double>(10);
var writer=channel.Writer;
//Create the actual "publisher" worker
_ = Task.Run(async ()=>{
await foreach(var msg in input.ReadAllAsync(token))
{
await writer.WriteAsync(Math.Sqrt(msg));
}
} ,token)
//Complete and propagate any exceptions
.ContinueWith(t=>writer.TryComplete(t.Exception));
return channel;
}
And finally print them
async Task Print(this ChannelReader<double> input,CancellationToken token=default)
{
await foreach(var msg in input.ReadAllAsync(token))
{
Console.WriteLine(msg);
}
}
Now we can build a pipeline
await Generate(100)
.Double()
.Square()
.Print();
And add a cancellation token to all steps :
using var cts=new CancellationTokenSource();
await Generate(100,cts.Token)
.Double(cts.Token)
.Square(cts.Token)
.Print(cts.Token);
Memory usage can increase if one step produces messages faster than they're consumed for a long time. This is easily handled by using a bounded instead of an unbounded channel. This way, if a method is too slow all the previous methods will have to await before publishing new data.