0

我正在尝试在 Web api 中使用 SignalR 实现数据库更改通知。下面是 NotificationService.cs 的代码

using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.SignalR;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Options;
using Npgsql;
using webapi.DBCalls;
using webapi.Models;

namespace webapi.Notification
{
public class NotificationService : IHostedService
{
    private IHubContext<HubConfig> _hub;
    private readonly ModelAppSettings _appSettings;
    private DBInterceptor _db_interceptor;

    public NotificationService(IHubContext<HubConfig> hub, IOptions<ModelAppSettings> appSettings)
    {
        _appSettings = appSettings.Value;
        _db_interceptor = new DBInterceptor(_appSettings);
        _hub = hub;
    }

    public Task StartAsync(CancellationToken stoppingToken)
    {
        onDataTableChangeListener();
        return Task.CompletedTask;
    }

    public void onDataTableChangeListener()
    {
            using (var connection = new NpgsqlConnection(_appSettings.ConnectionString))
            {
                connection.Open();
                connection.Notification += (o, e) => notifyDataChange(e.Payload);
                using (var cmd = new NpgsqlCommand("LISTEN datachange", connection))
                    cmd.ExecuteNonQuery();
                while (true)
                    connection.Wait();
            }
    }

    public void notifyDataChange(string payload)
    {
        //DO some work here

    }

    public Task StopAsync(CancellationToken stoppingToken)
    {
        return Task.CompletedTask;
    }
}
}

我在下面的ConfigurationService AS 下 的 Startup.cs 中注册此服务services.AddHostedService<NotificationService>();

当我运行程序时,它永远不会启动并挂在下面的行。

while (true) 
  connection.Wait();

我知道我需要重写这个方法,但不知道怎么写。任何帮助,将不胜感激。

4

1 回答 1

0

您没有IHostedService正确实现接口。您正在使用 阻塞当前线程connection.Wait()。您应该在StartAsyncThreadPool 上启动一个新线程或一个 Task,它将在后台处理通知。如果StopAsync需要,您将终止处理并执行清理。

这部分由BackgroundService您必须实现ExecuteAsync方法的类实现。使用BackgroundService类你的服务应该是这样的:

public class NotificationSerice : BackgroundService
{
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        using var connection = new NpgsqlConnection("ConnectionString");
        await connection.OpenAsync();
        connection.Notification += (o, e) => notifyDataChange(e.Payload, stoppingToken);
        
        using (var cmd = new NpgsqlCommand("LISTEN datachange", connection))
            await cmd.ExecuteNonQueryAsync(stoppingToken);

        while (true)
            await connection.WaitAsync(stoppingToken);
    }

    private void notifyDataChange(string payload, CancellationToken stoppingToken)
    {
        ///
    }
}

在示例中,它被实现为异步方法。也可以使用Task.Run()which 在 ThreadPool 上运行任务。stoppingToken将在应用程序关闭时触发。

于 2021-12-27T20:22:49.027 回答