我正在将 MongoDB SignalR(版本 2.2.1)背板添加到现有 SignalR 集线器。如果没有背板,则调用集线器上的 OnConnected 方法,一旦注册了背板,就不会调用 OnConnected 方法。使用调试器,我可以看到当客户端订阅时,我的背板 ScaleoutMessageBus 的订阅和发送方法被调用,但 OnConnected 方法从未被调用。
我的 OnConnected 方法:
public override Task OnConnected()
{
var connectionManager = NestedContainer().GetInstance<IHubConnectionManager>();
var userName = Context.User.Identity.Name;
connectionManager.AddConnection<T>(userName, Context.ConnectionId);
return base.OnConnected();
}
我的 ScaleoutMessageBus:
public class MongoMessageBus : ScaleoutMessageBus
{
private readonly MongoCollection<MongoMessage> _collection;
private readonly ILog _log;
private readonly Task _backgroundTask;
public MongoMessageBus(IDependencyResolver resolver, MongoScaleoutConfiguration configuration)
: base(resolver, configuration)
{
_log = GlobalValues.Container.GetInstance<ILog>();
var database = GlobalValues.Container.GetInstance<IMongoDatabaseProvider>().GetSwimlaneDatabase();
CreateCappedCollection(database, configuration);
_collection = database.GetCollection<MongoMessage>(configuration.CollectionName);
if (!_collection.IsCapped())
throw new Exception($"Mongo collection {configuration.CollectionName} must be capped.");
var cancellationToken = new CancellationToken();
_backgroundTask = Task.Run(() => ProcessMessages(cancellationToken), cancellationToken);
}
protected override Task Send(int streamIndex, IList<Message> messages)
{
return Task.Run(() =>
{
var message = new MongoMessage(streamIndex, messages);
_collection.Save(message);
});
}
protected override Task Send(IList<Message> messages)
{
return Task.Run(() =>
{
var message = new MongoMessage(messages);
_collection.Save(message);
});
}
protected override void Dispose(bool disposing)
{
_backgroundTask.Dispose();
base.Dispose(disposing);
}
private void ProcessMessages(CancellationToken cancellationToken)
{
try
{
while (true)
{
cancellationToken.ThrowIfCancellationRequested();
var messages = _collection.Find(Query<MongoMessage>.EQ(i => i.Processed, false))
.SetFlags(QueryFlags.TailableCursor | QueryFlags.NoCursorTimeout | QueryFlags.AwaitData);
foreach (var message in messages)
{
var id = (ulong) message.Id.CreationTime.Ticks;
var msg = ScaleoutMessage.FromBytes(message.Value);
OnReceived(message.StreamIndex, id, msg);
var query = Query<MongoMessage>.EQ(i => i.Id, message.Id);
var update = Update<MongoMessage>.Set(i => i.Processed, true);
_collection.Update(query, update);
}
}
}
catch (Exception ex)
{
_log.Write.Error(ex, $"An error occurred processing the Mongo message bus: {ex.Message}");
}
}
private void CreateCappedCollection(MongoDatabase database, MongoScaleoutConfiguration configuration)
{
if (!database.CollectionExists(configuration.CollectionName))
{
var collectionOptions = CollectionOptions.SetCapped(true)
.SetMaxSize(configuration.CollectionMaxSize)
.SetMaxDocuments(configuration.MaxDocuments);
database.CreateCollection(configuration.CollectionName, collectionOptions);
}
}
}
我只在一台服务器上进行测试,所以我希望我应该看到在该服务器上调用的 OnConnected 事件。我需要处理一些事情来确保使用背板调用 OnConnected 吗?