我找到了解决方案。我不知道它是否是世界上最美丽的,但它确实有效。我创建了私有财产,所以每个用户可以有多个会话:
private ConcurrentDictionary<long, ChatServerUserSessions> _chatServerUserSessionInfoDictionary = new ConcurrentDictionary<long, ChatServerUserSessions>();
和会话类:
public class ChatServerUserSessions
{
public long UserId { get; set; }
public string UserName { get; set; }
public ConcurrentDictionary<string, ChatServerUserSessionInfo> Sessions { get; set; }
public object Lock { get; set; }
}
并且对于我创建的每个会话:
public class ChatServerUserSessionInfo : IObservable<ChatServerRoomActivityInfoBase>, IDisposable
{
public string SessionId { get; set; }
public List<long> SubscribedRoomIds { get; set; }
public DateTime SubscriptionTicketInvalidationDate { get; set; }
public Queue<ChatServerRoomActivityInfoBase> MessagesQueue { get; set; }
private IDisposable subscription;
private List<IObserver<ChatServerRoomActivityInfoBase>> observers;
private ChatServerUserSessions parentUserSessions;
public ChatServerUserSessionInfo(string sessionId, DateTime subscriptionTicketInvalidationDate, Subject<ChatServerRoomActivityInfoBase> chatServerRoomActivity, ChatServerUserSessions parentUserSessions)
{
this.SessionId = sessionId;
this.SubscribedRoomIds = new List<long>();
this.SubscriptionTicketInvalidationDate = subscriptionTicketInvalidationDate;
this.MessagesQueue = new Queue<ChatServerRoomActivityInfoBase>();
this.parentUserSessions = parentUserSessions;
subscription = chatServerRoomActivity.Subscribe(activity =>
{
lock (parentUserSessions.Lock)
{
if (this.SubscribedRoomIds.Contains(activity.RoomId))
{
this.MessagesQueue.Enqueue(activity);
foreach (var observer in observers)
{
observer.OnNext(activity);
}
}
}
});
observers = new List<IObserver<ChatServerRoomActivityInfoBase>>();
}
~ChatServerUserSessionInfo()
{
Dispose();
}
public void Dispose()
{
if (subscription != null)
{
subscription.Dispose();
subscription = null;
}
this.observers = null;
GC.SuppressFinalize(this);
}
public IDisposable Subscribe(IObserver<ChatServerRoomActivityInfoBase> observer)
{
lock (parentUserSessions.Lock)
{
this.observers.Add(observer);
return (IDisposable)new Subscription(this, observer);
}
}
private void Unsubscribe(IObserver<ChatServerRoomActivityInfoBase> observer)
{
lock (parentUserSessions.Lock)
{
if (this.observers == null)
{
return;
}
this.observers.Remove(observer);
}
}
private class Subscription : IDisposable
{
private ChatServerUserSessionInfo subject;
private IObserver<ChatServerRoomActivityInfoBase> observer;
public Subscription(ChatServerUserSessionInfo subject, IObserver<ChatServerRoomActivityInfoBase> observer)
{
this.subject = subject;
this.observer = observer;
}
public void Dispose()
{
IObserver<ChatServerRoomActivityInfoBase> observer = Interlocked.Exchange<IObserver<ChatServerRoomActivityInfoBase>>(ref this.observer, (IObserver<ChatServerRoomActivityInfoBase>)null);
if (observer == null)
{
return;
}
this.subject.Unsubscribe(observer);
this.subject = (ChatServerUserSessionInfo)null;
}
}
}
每个用户会话都有自己的 MessageQueue 并订阅了全局聊天室活动主题。ChatRoomActivityMessages 会为每个会话单独保留。这是检索消息的方法:
public void CheckForChatRoomsActivityAsync(long userId, string userName, Action<List<ChatServerRoomActivityInfoBase>> onChatRoomActivity)
{
var sessionId = GetCurrentSessionId();
var chatServerUserSessions = GetChatServerUserSessions(userId, userName);
lock (chatServerUserSessions.Lock)
{
var userSession = GetChatServerUserSessionInfo(sessionId, chatServerUserSessions);
ProlongSubscriptions(userSession);
if (userSession.MessagesQueue.Count > 0)
{
var activities = new List<ChatServerRoomActivityInfoBase>();
while (userSession.MessagesQueue.Count > 0)
{
activities.Add(userSession.MessagesQueue.Dequeue());
}
onChatRoomActivity(activities);
}
else
{
var queued = ThreadPool.QueueUserWorkItem(new WaitCallback(parm =>
{
var activities = new List<ChatServerRoomActivityInfoBase>();
var wait = new AutoResetEvent(false);
using (var subscriber = userSession.Subscribe(activity =>
{
lock (chatServerUserSessions.Lock)
{
activities.Add(activity);
userSession.MessagesQueue.Dequeue();
wait.Set();
}
}))
{
wait.WaitOne(TimeSpan.FromSeconds(CheckForActivityMaxWaitSeconds));
}
((Action<List<ChatServerRoomActivityInfoBase>>)parm)(activities);
}), onChatRoomActivity);
if (!queued)
{
onChatRoomActivity(new List<ChatServerRoomActivityInfoBase>());
}
}
}
}