1

我有一个 .NETBackgroundService用于使用 .NET 管理通知BlockingCollection<Notification>

我的实现是导致 CPU 使用率高,即使BlockingCollection.

我收集了一些转储,似乎我遇到了线程池饥饿。

我不确定应该如何重构以避免这种情况。

private readonly BlockingCollection<Notification> _notifications;

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        Task.Run(async () =>
        {
            await _notificationsContext.Database.MigrateAsync(stoppingToken);

            while (!stoppingToken.IsCancellationRequested)
            {

                foreach (var notification in _notifications.GetConsumingEnumerable(stoppingToken))
                {
                   // process notification
                }


            }
        }, stoppingToken);
    }

我也尝试删除 while 循环,但问题仍然存在。

核心转储屏幕截图

编辑:添加了制作人

 public abstract class CommandHandlerBase
    {
        private readonly BlockingCollection<Notification> _notifications;

        public CommandHandlerBase(BlockingCollection<Notification> notifications)
        {
            _notifications = notifications;
        }
        protected void EnqueueNotification(AlertImapact alertImapact,
                                           AlertUrgency alertUrgency,
                                           AlertSeverity alertServerity,
                                           string accountName,
                                           string summary,
                                           string details,
                                           bool isEnabled,
                                           Exception exception,
                                           CancellationToken cancellationToken = default)
        {

            var notification = new Notification(accountName, summary, details, DateTime.UtcNow, exception.GetType().ToString())
            {
                Imapact = alertImapact,
                Urgency = alertUrgency,
                Severity = alertServerity,
                IsSilenced = !isEnabled,
            };

            _notifications.Add(notification, cancellationToken);
        }
    }
4

3 回答 3

2

阻塞是昂贵的,但让线程休眠并重新调度更昂贵。为了避免这种情况,.NET 通常在实际阻塞线程之前使用SpinWait开始阻塞操作。spinwait 使用一个核心在一段时间内什么都不做,这会导致您观察到的 CPU 使用率。

要解决此问题,请使用像Channels这样的异步集合。

  • 通道允许您异步向其发布或读取消息,同时保留它们的顺序。
  • 它是线程安全的,这意味着多个读取器和写入器可以同时对其进行写入。
  • 您可以创建有界频道以防止发布者在频道已满时发帖。
  • 最后,您可以通过 读取 Channel 中的所有消息IAsyncEnumerable,使处理代码更容易。

避免阻塞通道

在您的情况下,代码可能会更改为:

private readonly Channel<Notification> _notifications=Channel.CreateUnbounded<Notification>();

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
    await _notificationsContext.Database.MigrateAsync(stoppingToken);

    await foreach(var notification in _notifications.Reader.ReadAllAsync(stoppingToken))
    {
               // process notification
    }
}

通道有意使用单独的接口进行读取和写入。要阅读,请使用Channel.Reader返回的ChannelReader类。要编写,请使用Channel.Writer返回的ChannelWriter类。可以将 Channel隐式转换为任一类型,从而轻松编写仅接受/生成 ChannelReader 或 ChannelWriter 的发布者和订阅者方法。

要写入通道,请使用 ChannelWriter 的WriteAsync方法:

await _notifications.Writer.WriteAsync(someNotification);

当您完成编写并想要关闭频道时,您需要在 writer 上调用Complete() :

await _notification.Writer.Complete();

处理循环将读取任何剩余的消息。要等待它完成,您需要等待ChannelReader.Completion任务:

await _notification.Reader.Completion;

从其他班级发帖

当您使用 BackgroundService 时,通知通常会来自其他类。这意味着发布者和服务都需要以某种方式访问​​同一频道。一种方法是使用辅助类并将其注入发布者和服务中。

该类MessageChannel<T>执行此操作并通过关闭编写器来处理应用程序终止:

public class MessageChannel<T>:IDisposable 
    {
        private readonly Channel<Envelope<T>> _channel;

        public ChannelReader<Envelope<T>> Reader => _channel;
        public ChannelWriter<Envelope<T>> Writer => _channel;

        public MessageChannel(IHostApplicationLifetime lifetime)
        {
            _channel = Channel.CreateBounded<Envelope<T>>(1);
            lifetime.ApplicationStopping.Register(() => Writer.TryComplete());
        }

        private readonly CancellationTokenSource _cts = new();

        public CancellationToken CancellationToken => _cts.Token;
        public void Stop()
        {
            _cts.Cancel();
        }

        public void Dispose()
        {
            _cts.Dispose();
        }
    }

这可以在后台服务中注入:

MessageChannel<Notification> _notifications;
ChannelReader<Notification> _reader;

public MyService(MessageChannel<Notification> notifications)
{
    _notifications=notifications;
    _reader=notifications.Reader;
}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
    await _notificationsContext.Database.MigrateAsync(stoppingToken);

    await foreach(var notification in _reader.ReadAllAsync(stoppingToken))
    {
               // process notification
    }
}
于 2021-10-15T12:41:02.593 回答
1

事实证明,该问题与另一个BackgroundService正在等待错误计算TimeSpan导致线程池饥饿的问题有关。

于 2021-10-15T16:41:50.773 回答
-3

虽然我认为对于提议的渠道解决方案可能存在争议,就像之前提出的那样,但我会投票支持更简单的解决方案,如果您愿意,渠道是为大量消息而设计的,所以如果有很多消息,请考虑它.

我怀疑你的高 CPU 发生是因为你的通知队列是空的并且你没有等待。

    公共类工人:BackgroundService
    {
        私有只读 ConcurrentQueue _messages = new ConcurrentQueue();               

        受保护的覆盖异步任务 ExecuteAsync(CancellationToken stoppingToken)
        {
            等待 Task.Factory.StartNew(() =>
            {
                而(!stoppingToken.IsCancellationRequested)
                {
                    等待 _notificationsContext.Database.MigrateAsync(stoppingToken);
                    while (_messages.TryDequeue(out var notification) && !stoppingToken.IsCancellationRequested)
                    {
                        //进程通知      
                    }
                    
                    //当你没有通知你不想进入疯狂循环的情况下的显式延迟,这是我怀疑正在发生的事情
                   Task.Delay(1000,stoppingToken).GetAwaiter().GetResult();
                }
            });
        }
    }
于 2021-10-15T15:15:04.237 回答