0

假设我提供了一个由 、 和 方法组成的事件生成器 API Start()Pause()以及Resume()一个ItemAvailable事件。生产者本身是外部代码,我无法控制它的线程。一些项目在被调用后可能仍然通过Pause()(生产者实际上是远程的,所以项目可能已经在网络上传输)。

还假设我正在编写消费者代码,其中消费可能比生产慢。

关键要求是

  1. 消费者事件处理程序不得阻塞生产者线程,并且
  2. 必须处理所有事件(不能删除任何数据)。

我在消费者中引入了一个缓冲区来消除一些突发性。但是在扩展突发的情况下,我想调用Producer.Pause(),然后Resume()在适当的时候调用,以避免在消费者端耗尽内存。

我有一个解决方案,Interlocked用于增加和减少一个计数器,该计数器与一个阈值进行比较以确定是时间到Pause还是Resume.

问题:就效率(和优雅)而言,有没有比Interlocked计数器(在下面的代码中)更好的解决方案?int current

更新的 MVP(不再从限制器中反弹):

namespace Experiments
{
    internal class Program
    {
        // simple external producer API for demo purposes
        private class Producer
        {
            public void Pause(int i) { _blocker.Reset(); Console.WriteLine($"paused at {i}"); }
            public void Resume(int i) { _blocker.Set(); Console.WriteLine($"resumed  at {i}"); }
            public async Task Start()
            {
                await Task.Run
                (
                    () =>
                    {
                        for (int i = 0; i < 10000; i++)
                        {
                            _blocker.Wait();
                            ItemAvailable?.Invoke(this, i);
                        }
                    }
                );
            }

            public event EventHandler<int> ItemAvailable;
            private ManualResetEventSlim _blocker = new(true);
        }

        private static async Task Main(string[] args)
        {
            var p = new Producer();
            var buffer = Channel.CreateUnbounded<int>(new UnboundedChannelOptions { SingleWriter = true });
            int threshold = 1000;
            int resumeAt = 10;
            int current = 0;
            int paused = 0;

            p.ItemAvailable += (_, i) =>
            {
                if (Interlocked.Increment(ref current) >= threshold
                    && Interlocked.CompareExchange(ref paused, 0, 1) == 0
                ) p.Pause(i);

                buffer.Writer.TryWrite(i);
            };

            var processor = Task.Run
            (
                async () =>
                {
                    await foreach (int i in buffer.Reader.ReadAllAsync())
                    {
                        Console.WriteLine($"processing {i}");
                        await Task.Delay(10);
                        if
                        (
                            Interlocked.Decrement(ref current) < resumeAt
                            && Interlocked.CompareExchange(ref paused, 1, 0) == 1
                        ) p.Resume(i);
                    }
                }
            );

            p.Start();
            await processor;
        }
    }
}
4

2 回答 2

1

如果您的目标是优雅,您可以考虑在 custom 中烘焙压力感知功能Channel<T>。下面是一个PressureAwareUnboundedChannel<T>派生自Channel<T>. 它提供了基类的所有功能,并且在通道受到压力以及压力释放时发出通知。通知通过一个IProgress<bool>实例推送,true当压力超过特定的高阈值时发出一个值,false当压力下降到特定的低阈值时发出一个值。

public sealed class PressureAwareUnboundedChannel<T> : Channel<T>
{
    private readonly Channel<T> _channel;
    private readonly int _highPressureThreshold;
    private readonly int _lowPressureThreshold;
    private readonly IProgress<bool> _pressureProgress;
    private int _pressureState = 0; // 0: no pressure, 1: under pressure

    public PressureAwareUnboundedChannel(int lowPressureThreshold,
        int highPressureThreshold, IProgress<bool> pressureProgress)
    {
        if (lowPressureThreshold < 0)
            throw new ArgumentOutOfRangeException(nameof(lowPressureThreshold));
        if (highPressureThreshold < lowPressureThreshold)
            throw new ArgumentOutOfRangeException(nameof(highPressureThreshold));
        if (pressureProgress == null)
            throw new ArgumentNullException(nameof(pressureProgress));
        _highPressureThreshold = highPressureThreshold;
        _lowPressureThreshold = lowPressureThreshold;
        _pressureProgress = pressureProgress;
        _channel = Channel.CreateBounded<T>(Int32.MaxValue);
        this.Writer = new ChannelWriter(this);
        this.Reader = new ChannelReader(this);
    }

    private class ChannelWriter : ChannelWriter<T>
    {
        private readonly PressureAwareUnboundedChannel<T> _parent;

        public ChannelWriter(PressureAwareUnboundedChannel<T> parent)
            => _parent = parent;
        public override bool TryComplete(Exception error = null)
            => _parent._channel.Writer.TryComplete(error);
        public override bool TryWrite(T item)
        {
            bool success = _parent._channel.Writer.TryWrite(item);
            if (success) _parent.SignalWriteOrRead();
            return success;
        }
        public override ValueTask<bool> WaitToWriteAsync(
            CancellationToken cancellationToken = default)
                => _parent._channel.Writer.WaitToWriteAsync(cancellationToken);
    }

    private class ChannelReader : ChannelReader<T>
    {
        private readonly PressureAwareUnboundedChannel<T> _parent;

        public ChannelReader(PressureAwareUnboundedChannel<T> parent)
            => _parent = parent;
        public override Task Completion => _parent._channel.Reader.Completion;
        public override bool CanCount => _parent._channel.Reader.CanCount;
        public override int Count => _parent._channel.Reader.Count;
        public override bool TryRead(out T item)
        {
            bool success = _parent._channel.Reader.TryRead(out item);
            if (success) _parent.SignalWriteOrRead();
            return success;
        }
        public override ValueTask<bool> WaitToReadAsync(
            CancellationToken cancellationToken = default)
                => _parent._channel.Reader.WaitToReadAsync(cancellationToken);
    }

    private void SignalWriteOrRead()
    {
        var currentCount = _channel.Reader.Count;
        bool underPressure;
        if (currentCount > _highPressureThreshold)
            underPressure = true;
        else if (currentCount <= _lowPressureThreshold)
            underPressure = false;
        else
            return;
        int newState = underPressure ? 1 : 0;
        int oldState = underPressure ? 0 : 1;
        if (Interlocked.CompareExchange(
            ref _pressureState, newState, oldState) != oldState) return;
        _pressureProgress.Report(underPressure);
    }
}

封装Channel<T>的实际上是一个有界通道,容量等于最大值Int32,因为只有有界通道实现了该Reader.Count属性。

使用示例:

var progress = new Progress<bool>(underPressure =>
{
    if (underPressure) Producer.Pause(); else Producer.Resume();
});
var channel = new PressureAwareUnboundedChannel<Item>(500, 1000, progress);

在这个例子中,Producer当存储在通道内的项目超过 1000 时将暂停,当项目数降至 500 或更少时将恢复。

Progress<bool>操作是在Progress<bool>创建时捕获的上下文中调用的。因此,如果您在 GUI 应用程序的 UI 线程上创建它,则该操作将在 UI 线程上调用,否则 in 将在ThreadPool. 在后一种情况下,将无法防止Action<bool>. 如果Producer该类不是线程安全的,则必须在处理程序中添加同步。例子:

var progress = new Progress<bool>(underPressure =>
{
    lock (Producer) if (underPressure) Producer.Pause(); else Producer.Resume();
});
于 2021-09-22T12:37:52.003 回答
1

如果您意识到这个问题有三个“步骤”,这相对简单。

  1. 第一步ToChannel(Producer)接收来自生产者的消息。
  2. 下一步,如果输出面板中有太多待处理的项目,则会PauseAt发出信号。pause()
  3. 第三步,如果其输入通道的计数小于阈值,则ResumeAt发出信号。resume()

使用典型的通道模式很容易组合所有三个步骤。


producer.ToChannel(token)
    .PauseAt(1000,()=>producer.PauseAsync(),token)
    .ResumeAt(10,()=>producer.ResumeAsync(),token)
    ....

或者一个单一的通用TrafficJam方法:

static ChannelReader<T> TrafficJam(this ChannelReader<T> source,
    int pauseAt,int resumeAt,
    Func<Task> pause,Func<Task> resume,
    CancellationToken token=default)
{
    return source
             .PauseAt(pauseAt,pause,token)
             .ResumeAt(resumeAt,resume,token);
}

到频道

第一步相对简单,基于生产者事件的无界 Channel 源。

static ChannelReader<int> ToChannel(this Producer producer,
                                    CancellationToken token=default)
{
    Channel<int> channel=Channel.CreateUnbounded();
    var writer=channel.Writer;
    producer.ItemAvailable += OnItem;
    return channel;

    void OnItem(object sender, int item)
    {
        writer.TryWriteAsync(item);
        if(token.IsCancellationRequested)
        {
            producer.ItemAvailable-=OnItem;
            writer.Complete();
            
        }
    }
}

唯一不寻常的部分是使用本地函数来允许禁用事件处理程序并在请求取消时完成输出通道

这足以将所有传入的项目排队。ToChannel不打扰开始,暂停等,那不是它的工作。

暂停

下一个函数PauseAt使用 BoundedChannel 来实现阈值。如果可以,它会转发传入的消息。如果通道不能再接受任何消息,它会调用pause回调并等待它可以恢复转发:

static ChannelReader<T> PauseAt(this ChannelReader<T> source, 
        int threshold, Func<Task> pause,
        CancellationToken token=default)
{
    Channel<T> channel=Channel.CreateBounded(threshold);
    var writer=channel.Writer;

    _ = Task.Run(async ()=>
        await foreach(var msg in source.ReadAllAsync(token))
        {
            if(writer.CanWrite())
            {
               await writer.WriteAsync(msg);
            }
            else
            {
               await pause();
               //Wait until we can post again
               await writer.WriteAsync(msg);
            }
        }
    },token)
    .ContinueWith(t=>writer.TryComplete(t.Exception));

    return channel;
}

简历在

如果其输入先前高于阈值并且现在具有较少的项目,则最后一步ResumeAt调用。resume()

如果输入不受限制,它只会转发所有消息。

static ChannelReader<T> ResumeAt(this ChannelReader<T> source, 
        int resumeAt, Func<Task> resume,
        CancellationToken token=default)
{
    Channel<T> channel=Channel.CreateUnbounded();
    var writer=channel.Writer;

    _ = Task.Run(async ()=>{
        bool above=false;
        await foreach(var msg in source.ReadAllAsync(token))
        {
            await writer.WriteAsync(msg);
            //Do nothing if the source isn't bounded
            if(source.CanCount)
            {
                if(above && source.Count<=resumeAt)
                {
                    await resume();
                    above=false;
                }       
                above=source.Count>resumeAt;  
            }
       }
    },token)
    .ContinueWith(t=>writer.TryComplete(t.Exception));

    return channel;
}

由于只使用了一个线程,我们可以保留之前的计数。以及它是否高于或低于阈值。

结合暂停和恢复

由于PauseResume使用通道,它们可以组合成一个方法:

static ChannelReader<T> TrafficJam(this ChannelReader<T> source,
    int pauseAt,int resumeAt,
    Func<Task> pause,Func<Task> resume,
    CancellationToken token=default)
{
    return source.PauseAt(pauseAt,pause,token)
             .ResumeAt(resumeAt,resume,token);
}
于 2021-09-22T15:09:35.430 回答