假设我提供了一个由 、 和 方法组成的事件生成器 API Start()
,Pause()
以及Resume()
一个ItemAvailable
事件。生产者本身是外部代码,我无法控制它的线程。一些项目在被调用后可能仍然通过Pause()
(生产者实际上是远程的,所以项目可能已经在网络上传输)。
还假设我正在编写消费者代码,其中消费可能比生产慢。
关键要求是
- 消费者事件处理程序不得阻塞生产者线程,并且
- 必须处理所有事件(不能删除任何数据)。
我在消费者中引入了一个缓冲区来消除一些突发性。但是在扩展突发的情况下,我想调用Producer.Pause()
,然后Resume()
在适当的时候调用,以避免在消费者端耗尽内存。
我有一个解决方案,Interlocked
用于增加和减少一个计数器,该计数器与一个阈值进行比较以确定是时间到Pause
还是Resume
.
问题:就效率(和优雅)而言,有没有比Interlocked
计数器(在下面的代码中)更好的解决方案?int current
更新的 MVP(不再从限制器中反弹):
namespace Experiments
{
internal class Program
{
// simple external producer API for demo purposes
private class Producer
{
public void Pause(int i) { _blocker.Reset(); Console.WriteLine($"paused at {i}"); }
public void Resume(int i) { _blocker.Set(); Console.WriteLine($"resumed at {i}"); }
public async Task Start()
{
await Task.Run
(
() =>
{
for (int i = 0; i < 10000; i++)
{
_blocker.Wait();
ItemAvailable?.Invoke(this, i);
}
}
);
}
public event EventHandler<int> ItemAvailable;
private ManualResetEventSlim _blocker = new(true);
}
private static async Task Main(string[] args)
{
var p = new Producer();
var buffer = Channel.CreateUnbounded<int>(new UnboundedChannelOptions { SingleWriter = true });
int threshold = 1000;
int resumeAt = 10;
int current = 0;
int paused = 0;
p.ItemAvailable += (_, i) =>
{
if (Interlocked.Increment(ref current) >= threshold
&& Interlocked.CompareExchange(ref paused, 0, 1) == 0
) p.Pause(i);
buffer.Writer.TryWrite(i);
};
var processor = Task.Run
(
async () =>
{
await foreach (int i in buffer.Reader.ReadAllAsync())
{
Console.WriteLine($"processing {i}");
await Task.Delay(10);
if
(
Interlocked.Decrement(ref current) < resumeAt
&& Interlocked.CompareExchange(ref paused, 1, 0) == 1
) p.Resume(i);
}
}
);
p.Start();
await processor;
}
}
}