2

我正在使用ChannelfromSystem.Threading.Channels并希望批量读取项目(5 个项目),我有如下方法,

public class Batcher
{
    private readonly Channel<MeasurementViewModel> _channel;
    public Batcher()
    {
        _channel = Channel.CreateUnbounded<MeasurementViewModel>();
    }
    public async Task<MeasurementViewModel[]> ReadBatchAsync(int batchSize, CancellationToken stoppingToken)
    {
        var result = new MeasurementViewModel[batchSize];

        for (var i = 0; i < batchSize; i++)
        {
            result[i] = await _channel.Reader.ReadAsync(stoppingToken);
        }

        return result;
    }
}

在 asp.net 核心后台服务中,我正在使用它,如下所示,

public class WriterService : BackgroundService
{
    private readonly Batcher _batcher;
    public WriterService(Batcher batcher)
    {
        _batcher = batcher;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            var batchOfItems = await _batcher.ReadBatchAsync(5, stoppingToken);

            var range = string.Join(',', batchOfItems.Select(item => item.Value));

            var x = range;
        }
    }
}

这是有效的,只要有 5 个项目Channel,我就会得到range

问题是,当只剩下 2 个项目Channel并且自最后 10 分钟以来没有项目出现时Channel,那么如何读取剩余的 2 个项目Channel

4

1 回答 1

3

您可以创建一个linked CancellationTokenSource,这样您就可以同时观察外部取消请求和内部引发的超时。下面是一个使用这种技术的例子,通过ReadBatchAsync为类创建一个扩展方法ChannelReader

public static async ValueTask<T[]> ReadBatchAsync<T>(
    this ChannelReader<T> channelReader,
    int batchSize, TimeSpan timeout, CancellationToken cancellationToken = default)
{
    // Arguments validation omitted
    var items = new List<T>(batchSize);
    using (var linkedCTS
        = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken))
    {
        linkedCTS.CancelAfter(timeout);
        while (true)
        {
            var token = items.Count == 0 ? cancellationToken : linkedCTS.Token;
            T item;
            try
            {
                item = await channelReader.ReadAsync(token).ConfigureAwait(false);
            }
            catch (OperationCanceledException)
            {
                cancellationToken.ThrowIfCancellationRequested();
                break; // The cancellation was induced by timeout (ignore it)
            }
            catch (ChannelClosedException)
            {
                if (items.Count == 0) throw;
                break;
            }
            items.Add(item);
            if (items.Count >= batchSize) break;
        }
    }
    return items.ToArray();
}

如果该批次至少包含一个项目,则此方法将在指定的时间过去后立即生成批次timeout,或者如果已达到则更快地生成批次。batchSize否则,一旦收到第一个项目,它将产生一个单项目批次。

如果通道已通过调用该channel.Writer.Complete()方法完成,并且它不再包含项目,则该ReadBatchAsync方法会传播与ChannelClosedException本机ReadAsync方法抛出的相同内容。

如果外部CancellationToken被取消,取消将通过抛出一个OperationCanceledException. 此时可能已经从内部提取的任何项目ChannelReader<T>都将丢失。这使得取消功能成为破坏性操作。建议在此Channel<T>之后将整个丢弃。

使用示例:

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
    while (true)
    {
        MeasurementViewModel[] batch;
        try
        {
            batch = await _channel.Reader.ReadBatchAsync(
                5, TimeSpan.FromMinutes(10), stoppingToken);
        }
        catch (OperationCanceledException) { return; }
        catch (ChannelClosedException) { break; }

        Console.WriteLine(String.Join(',', batch.Select(item => item.Value)));
    }
    await _channel.Reader.Completion; // Propagate possible failure
}
于 2020-09-14T12:26:23.690 回答