2

我正在将 MongoDB SignalR(版本 2.2.1)背板添加到现有 SignalR 集线器。如果没有背板,则调用集线器上的 OnConnected 方法,一旦注册了背板,就不会调用 OnConnected 方法。使用调试器,我可以看到当客户端订阅时,我的背板 ScaleoutMessageBus 的订阅和发送方法被调用,但 OnConnected 方法从未被调用。

我的 OnConnected 方法:

public override Task OnConnected()
{
        var connectionManager = NestedContainer().GetInstance<IHubConnectionManager>();
        var userName = Context.User.Identity.Name;
        connectionManager.AddConnection<T>(userName, Context.ConnectionId);
        return base.OnConnected();
    }

我的 ScaleoutMessageBus:

public class MongoMessageBus : ScaleoutMessageBus
{
    private readonly MongoCollection<MongoMessage> _collection;
    private readonly ILog _log;
    private readonly Task _backgroundTask;

    public MongoMessageBus(IDependencyResolver resolver, MongoScaleoutConfiguration configuration)
        : base(resolver, configuration)
    {
        _log = GlobalValues.Container.GetInstance<ILog>();
        var database = GlobalValues.Container.GetInstance<IMongoDatabaseProvider>().GetSwimlaneDatabase();
        CreateCappedCollection(database, configuration);

        _collection = database.GetCollection<MongoMessage>(configuration.CollectionName);

        if (!_collection.IsCapped())
            throw new Exception($"Mongo collection {configuration.CollectionName} must be capped.");

        var cancellationToken = new CancellationToken();
        _backgroundTask = Task.Run(() => ProcessMessages(cancellationToken), cancellationToken);
    }

    protected override Task Send(int streamIndex, IList<Message> messages)
    {
        return Task.Run(() =>
        {
            var message = new MongoMessage(streamIndex, messages);
            _collection.Save(message);
        });
    }

    protected override Task Send(IList<Message> messages)
    {
        return Task.Run(() =>
        {
            var message = new MongoMessage(messages);
            _collection.Save(message);
        });
    }

    protected override void Dispose(bool disposing)
    {
        _backgroundTask.Dispose();

        base.Dispose(disposing);
    }

    private void ProcessMessages(CancellationToken cancellationToken)
    {
        try
        {
            while (true)
            {
                cancellationToken.ThrowIfCancellationRequested();

                var messages = _collection.Find(Query<MongoMessage>.EQ(i => i.Processed, false))
                    .SetFlags(QueryFlags.TailableCursor | QueryFlags.NoCursorTimeout | QueryFlags.AwaitData);

                foreach (var message in messages)
                {
                    var id = (ulong) message.Id.CreationTime.Ticks;
                    var msg = ScaleoutMessage.FromBytes(message.Value);
                    OnReceived(message.StreamIndex, id, msg);

                    var query = Query<MongoMessage>.EQ(i => i.Id, message.Id);
                    var update = Update<MongoMessage>.Set(i => i.Processed, true);
                    _collection.Update(query, update);
                }
            }
        }
        catch (Exception ex)
        {
            _log.Write.Error(ex, $"An error occurred processing the Mongo message bus: {ex.Message}");
        }
    }

    private void CreateCappedCollection(MongoDatabase database, MongoScaleoutConfiguration configuration)
    {
        if (!database.CollectionExists(configuration.CollectionName))
        {
            var collectionOptions = CollectionOptions.SetCapped(true)
            .SetMaxSize(configuration.CollectionMaxSize)
            .SetMaxDocuments(configuration.MaxDocuments);
            database.CreateCollection(configuration.CollectionName, collectionOptions);
        }
    }
}

我只在一台服务器上进行测试,所以我希望我应该看到在该服务器上调用的 OnConnected 事件。我需要处理一些事情来确保使用背板调用 OnConnected 吗?

4

0 回答 0