6

我在 startup.cs 上配置服务器如下

GlobalHost.HubPipeline.RequireAuthentication();

// Make long polling connections wait a maximum of 110 seconds for a
// response. When that time expires, trigger a timeout command and
// make the client reconnect.
GlobalHost.Configuration.ConnectionTimeout = TimeSpan.FromSeconds(40);
// Wait a maximum of 30 seconds after a transport connection is lost
// before raising the Disconnected event to terminate the SignalR connection.
GlobalHost.Configuration.DisconnectTimeout = TimeSpan.FromSeconds(30);
// For transports other than long polling, send a keepalive packet every
// 10 seconds. 
// This value must be no more than 1/3 of the DisconnectTimeout value.
GlobalHost.Configuration.KeepAlive = TimeSpan.FromSeconds(10);

GlobalHost.HubPipeline.AddModule(new SOHubPipelineModule());
var hubConfiguration = new HubConfiguration { EnableDetailedErrors = true  };

var heartBeat = GlobalHost.DependencyResolver.Resolve<ITransportHeartbeat>();
var monitor = new PresenceMonitor(heartBeat);
monitor.StartMonitoring();

app.MapSignalR(hubConfiguration);

PresenceMonitor负责检查 unlive data 的类在哪里。因为我使用以下代码将它们保存在数据库中

public class PresenceMonitor
{
    private readonly ITransportHeartbeat _heartbeat;
    private Timer _timer;
    // How often we plan to check if the connections in our store are valid
    private readonly TimeSpan _presenceCheckInterval = TimeSpan.FromSeconds(40);

    // How many periods need pass without an update to consider a connection invalid
    private const int periodsBeforeConsideringZombie = 1;

    // The number of seconds that have to pass to consider a connection invalid.
    private readonly int _zombieThreshold;

    public PresenceMonitor(ITransportHeartbeat heartbeat)
    {
        _heartbeat = heartbeat;
        _zombieThreshold = (int)_presenceCheckInterval.TotalSeconds * periodsBeforeConsideringZombie;
    }

    public async void StartMonitoring()
    {
        if (_timer == null)
        {
            _timer = new Timer(_ =>
            {
                try
                {
                    Check();
                }
                catch (Exception ex)
                {
                    // Don't throw on background threads, it'll kill the entire process
                    Trace.TraceError(ex.Message);
                }
            },
            null,
            TimeSpan.Zero,
            _presenceCheckInterval);
        }
    }

    private async void Check()
    {
        // Get all connections on this node and update the activity
        foreach (var trackedConnection in _heartbeat.GetConnections())
        {
            if (!trackedConnection.IsAlive)
            {
                await trackedConnection.Disconnect();
                continue;
            }
            var log = AppLogFactory.Create<WebApiApplication>();
            log.Info($"{trackedConnection.ConnectionId} still live ");

            var connection = await (new Hubsrepository()).FindAsync(c => c.ConnectionId == trackedConnection.ConnectionId);
            // Update the client's last activity
            if (connection != null)
            {
                connection.LastActivity = DateTimeOffset.UtcNow;
                await (new Hubsrepository()).UpdateAsync(connection, connection.Id).ConfigureAwait(false);
            }                
        }

        // Now check all db connections to see if there's any zombies

        // Remove all connections that haven't been updated based on our threshold
        var hubRepository = new Hubsrepository();
        var zombies =await hubRepository.FindAllAsync(c => 
            SqlFunctions.DateDiff("ss", c.LastActivity, DateTimeOffset.UtcNow) >= _zombieThreshold);

        // We're doing ToList() since there's no MARS support on azure
        foreach (var connection in zombies.ToList())
        {
            await hubRepository.DeleteAsync(connection);
        }             
    }
}

我的集线器连接断开,重新连接看起来像

public override async Task OnConnected()
{
    var log = AppLogFactory.Create<WebApiApplication>();
    if (Context.QueryString["transport"] == "webSockets")
    {
        log.Info($"Connection is Socket");
    }

    if (Context.Headers.Any(kv => kv.Key == "CMSId"))
    {
        // Check For security
        var hederchecker = CryptLib.Decrypt(Context.Headers["CMSId"]);
        if (string.IsNullOrEmpty(hederchecker))
        {
            log.Info($"CMSId cannot be decrypted {Context.Headers["CMSId"]}");
            return;
        }
        log.Info($" {hederchecker} CMSId online at {DateTime.UtcNow} ");

        var user = await (new UserRepository()).FindAsync(u => u.CMSUserId == hederchecker);
        if (user != null)
            await (new Hubsrepository()).AddAsync(new HubConnection()
            {
                UserId = user.Id,
                ConnectionId = Context.ConnectionId,
                UserAgent = Context.Request.Headers["User-Agent"],
                LastActivity = DateTimeOffset.UtcNow
            }).ConfigureAwait(false);

        //_connections.Add(hederchecker, Context.ConnectionId);
    }
    return;
}

public override async Task OnDisconnected(bool stopCalled)
{
    try
    {
        //if (!stopCalled)
        {
            var hubRepo = (new Hubsrepository());

            var connection = await hubRepo.FindAsync(c => c.ConnectionId == Context.ConnectionId);
            if (connection != null)
            {
                var user = await (new UserRepository()).FindAsync(u => u.Id == connection.UserId);
                await hubRepo.DeleteAsync(connection);
                if (user != null)
                {
                    //var log = AppLogFactory.Create<WebApiApplication>();
                    //log.Info($"CMSId cannot be decrypted {cmsId}");

                    using (UserStatusRepository repo = new UserStatusRepository())
                    {
                        //TODO :: To be changed immediatley in next release , Date of change 22/02/2017 

                        var result = await (new CallLogRepository()).CallEvent(user.CMSUserId);
                        if (result.IsSuccess)
                        {
                            var log = AppLogFactory.Create<WebApiApplication>();
                            var isStudent = await repo.CheckIfStudent(user.CMSUserId);

                            log.Info($" {user.CMSUserId} CMSId Disconnected here Before Set offline  at {DateTime.UtcNow} ");

                            var output = await repo.OfflineUser(user.CMSUserId);

                            log.Info($" {user.CMSUserId} CMSId Disconnected here after Set offline  at {DateTime.UtcNow} ");

                            if (output)
                            {
                                log.Info($" {user.CMSUserId} CMSId Disconnected at {DateTime.UtcNow} ");

                                Clients.All.UserStatusChanged(user.CMSUserId, false, isStudent);
                            }
                        }
                    }
                }
            }
        }
    }
    catch (Exception e)
    {
        var log = AppLogFactory.Create<WebApiApplication>();
        log.Error($"CMSId cannot Faild to be offline {Context.ConnectionId} with error {e.Message}{Environment.NewLine}{e.StackTrace}");
    }
}

public override async Task OnReconnected()
{
    string name = Context.User.Identity.Name;

    var log = AppLogFactory.Create<WebApiApplication>();
    log.Info($" {name} CMSId Reconnected at {DateTime.UtcNow} ");

    var connection = await (new Hubsrepository()).FindAsync(c => c.ConnectionId == Context.ConnectionId);
    if (connection == null)
    {
        var user = await (new UserRepository()).FindAsync(u => u.CMSUserId == name);
        if (user != null)
            await (new Hubsrepository()).AddAsync(new HubConnection()
            {
                UserId = user.Id,
                ConnectionId = Context.ConnectionId,
                UserAgent = Context.Request.Headers["User-Agent"],
                LastActivity = DateTimeOffset.UtcNow
            }).ConfigureAwait(false);
    }
    else
    {
        connection.LastActivity = DateTimeOffset.UtcNow;
        await (new Hubsrepository()).UpdateAsync(connection, connection.Id).ConfigureAwait(false);
    }
}

所有测试用例都顺利通过,除非客户端的互联网中断连接保持超过 10 分钟,这是否与身份验证有关,或者我身边的任何配置错误任何帮助我真的不知道出了什么问题。客户端使用 websocket 传输

4

0 回答 0