2

在此示例的基础上,我使用 Rx 在 .NET 中实现了一个简单的聊天: https ://blogs.claritycon.com/blog/2011/04/roll-your-own-mvc-3-long-polling-chat -地点/

有一种方法可以使用 LongPolling 等待新消息的到来:

public static void CheckForMessagesAsync(Action<List<MessageInfo>> onMessages)
{
    var queued = ThreadPool.QueueUserWorkItem(new WaitCallback(parm =>
    {
        var msgs = new List<MessageInfo>();
        var wait = new AutoResetEvent(false);
        using (var subscriber = _messages.Subscribe(msg =>
                                        {
                                            msgs.Add(msg);
                                            wait.Set();
                                        }))
        {
            // Wait for the max seconds for a new msg
            wait.WaitOne(TimeSpan.FromSeconds(MaxWaitSeconds));
        }

        ((Action<List<MessageInfo>>)parm)(msgs);
    }), onMessages);

    if (!queued)
        onMessages(new List<MessageInfo>());
}

使用这种方法,我会丢失在断开连接和处置观察者以及重新连接之间出现的消息。如何正确实施此机制以不丢失这些消息?

4

1 回答 1

0

我找到了解决方案。我不知道它是否是世界上最美丽的,但它确实有效。我创建了私有财产,所以每个用户可以有多个会话:

private ConcurrentDictionary<long, ChatServerUserSessions> _chatServerUserSessionInfoDictionary = new ConcurrentDictionary<long, ChatServerUserSessions>();

和会话类:

public class ChatServerUserSessions
{
    public long UserId { get; set; }

    public string UserName { get; set; }

    public ConcurrentDictionary<string, ChatServerUserSessionInfo> Sessions { get; set; }

    public object Lock { get; set; }
}

并且对于我创建的每个会话:

public class ChatServerUserSessionInfo : IObservable<ChatServerRoomActivityInfoBase>, IDisposable
{
    public string SessionId { get; set; }

    public List<long> SubscribedRoomIds { get; set; }

    public DateTime SubscriptionTicketInvalidationDate { get; set; }

    public Queue<ChatServerRoomActivityInfoBase> MessagesQueue { get; set; }

    private IDisposable subscription;
    private List<IObserver<ChatServerRoomActivityInfoBase>> observers;
    private ChatServerUserSessions parentUserSessions;

    public ChatServerUserSessionInfo(string sessionId, DateTime subscriptionTicketInvalidationDate, Subject<ChatServerRoomActivityInfoBase> chatServerRoomActivity, ChatServerUserSessions parentUserSessions)
    {
        this.SessionId = sessionId;
        this.SubscribedRoomIds = new List<long>();
        this.SubscriptionTicketInvalidationDate = subscriptionTicketInvalidationDate;
        this.MessagesQueue = new Queue<ChatServerRoomActivityInfoBase>();
        this.parentUserSessions = parentUserSessions;

        subscription = chatServerRoomActivity.Subscribe(activity =>
        {
            lock (parentUserSessions.Lock)
            {
                if (this.SubscribedRoomIds.Contains(activity.RoomId))
                {
                    this.MessagesQueue.Enqueue(activity);

                    foreach (var observer in observers)
                    {
                        observer.OnNext(activity);
                    }
                }
            }
        });

        observers = new List<IObserver<ChatServerRoomActivityInfoBase>>();
    }

    ~ChatServerUserSessionInfo()
    {
        Dispose();
    }

    public void Dispose()
    {
        if (subscription != null)
        {
            subscription.Dispose();
            subscription = null;
        }

        this.observers = null;

        GC.SuppressFinalize(this);
    }

    public IDisposable Subscribe(IObserver<ChatServerRoomActivityInfoBase> observer)
    {
        lock (parentUserSessions.Lock)
        {
            this.observers.Add(observer);
            return (IDisposable)new Subscription(this, observer);
        }
    }

    private void Unsubscribe(IObserver<ChatServerRoomActivityInfoBase> observer)
    {
        lock (parentUserSessions.Lock)
        {
            if (this.observers == null)
            {
                return;
            }

            this.observers.Remove(observer);
        }
    }

    private class Subscription : IDisposable
    {
        private ChatServerUserSessionInfo subject;
        private IObserver<ChatServerRoomActivityInfoBase> observer;

        public Subscription(ChatServerUserSessionInfo subject, IObserver<ChatServerRoomActivityInfoBase> observer)
        {
            this.subject = subject;
            this.observer = observer;
        }

        public void Dispose()
        {
            IObserver<ChatServerRoomActivityInfoBase> observer = Interlocked.Exchange<IObserver<ChatServerRoomActivityInfoBase>>(ref this.observer, (IObserver<ChatServerRoomActivityInfoBase>)null);
            if (observer == null)
            {
                return;
            }

            this.subject.Unsubscribe(observer);
            this.subject = (ChatServerUserSessionInfo)null;
        }
    }
}

每个用户会话都有自己的 MessageQueue 并订阅了全局聊天室活动主题。ChatRoomActivityMessages 会为每个会话单独保留。这是检索消息的方法:

public void CheckForChatRoomsActivityAsync(long userId, string userName, Action<List<ChatServerRoomActivityInfoBase>> onChatRoomActivity)
    {
        var sessionId = GetCurrentSessionId();
        var chatServerUserSessions = GetChatServerUserSessions(userId, userName);

        lock (chatServerUserSessions.Lock)
        {
            var userSession = GetChatServerUserSessionInfo(sessionId, chatServerUserSessions);
            ProlongSubscriptions(userSession);

            if (userSession.MessagesQueue.Count > 0)
            {
                var activities = new List<ChatServerRoomActivityInfoBase>();
                while (userSession.MessagesQueue.Count > 0)
                {
                    activities.Add(userSession.MessagesQueue.Dequeue());
                }

                onChatRoomActivity(activities);
            }
            else
            {
                var queued = ThreadPool.QueueUserWorkItem(new WaitCallback(parm =>
                {
                    var activities = new List<ChatServerRoomActivityInfoBase>();
                    var wait = new AutoResetEvent(false);

                    using (var subscriber = userSession.Subscribe(activity =>
                    {
                        lock (chatServerUserSessions.Lock)
                        {
                            activities.Add(activity);
                            userSession.MessagesQueue.Dequeue();

                            wait.Set();
                        }
                    }))
                    {
                        wait.WaitOne(TimeSpan.FromSeconds(CheckForActivityMaxWaitSeconds));
                    }

                    ((Action<List<ChatServerRoomActivityInfoBase>>)parm)(activities);
                }), onChatRoomActivity);

                if (!queued)
                {
                    onChatRoomActivity(new List<ChatServerRoomActivityInfoBase>());
                }
            }
        }
    }
于 2012-06-29T10:09:11.990 回答