0

我正在尝试创建一个非常简单的通知系统(不想使用 SignalIR 或其他东西)。我有以下测试代码:

客户端:

var source = new EventSource('/notifications.axd');

source.onopen = function () {
    Console.log("Connection open");
};

source.onerror = function () {
    Console.log("Connection error");
};

source.onmessage = function (event) {
    Console.log("Message: " + event.data);
};

服务器端:

public class NotificationMessage {
    public NotificationMessage() {
        Id = Guid.NewGuid().ToString();
    }

    public string Id { get; private set; }
}

public class NotificationsHandler : HttpTaskAsyncHandler {
    private const string CONTENT_TYPE = "text/event-stream";

    private sealed class NotificationItem {
        public ConcurrentQueue<NotificationMessage> Messages;
        public CancellationTokenSource CancellationTokenSource;
    }

    private static readonly ConcurrentDictionary<string, NotificationItem> _tasks =
        new ConcurrentDictionary<string, NotificationItem>();

    public static void Notify(string hostId, string userId, NotificationMessage message) {
        NotificationItem item;
        if (!_tasks.TryGetValue(string.Format("{0}|{1}", hostId, userId), out item)) {
            return;
        }

        var tokenSource = item.CancellationTokenSource;
        item.Messages.Enqueue(message);
        item.CancellationTokenSource = new CancellationTokenSource();
        tokenSource.Cancel();
    }

    public override async Task ProcessRequestAsync(HttpContext context) {
        HttpRequest request = context.Request;

        NotificationItem item = _tasks.GetOrAdd(
            string.Format("{0}|{1}", request.Url.Host, CsSession.Data.CurrentUser.Id), 
            k => new NotificationItem {
                Messages = new ConcurrentQueue<NotificationMessage>(),
                CancellationTokenSource = new CancellationTokenSource()
            }
        );

        HttpResponse response = context.Response;

        response.ContentType = CONTENT_TYPE;
        response.CacheControl = "no-cache";
        response.ContentEncoding = Encoding.UTF8;
        response.AppendHeader("connection", "keep-alive");

        response.BufferOutput = false;
        bool supportsAsyncFlush = response.SupportsAsyncFlush;
        bool shouldPing = true;

        while (response.IsClientConnected) {
            try {
                NotificationMessage message = null;
                if ((!item.Messages.IsEmpty && item.Messages.TryDequeue(out message)) || shouldPing) {
                    response.Write(string.Format("data:{0}\n\n", message == null ? "{}" : JsonMapper.Serialize(message)));

                    if (supportsAsyncFlush) {
                        await Task.Factory.FromAsync(response.BeginFlush, response.EndFlush, null);

                    } else {
                        response.Flush();
                    }
                } 

            } catch (Exception) {
                break;
            }

            var delay = Task.Delay(15000, item.CancellationTokenSource.Token);
            await delay;
            shouldPing = delay.Status == TaskStatus.RanToCompletion;
        }
    }
}

问题是:以上不起作用。我有两个问题:

1)当客户端连接时,我收到一个空包(没关系)。然后,如果我没有将任何消息排入队列,在等待 Task.Delay 之后,循环会尝试再次写入空消息,但我不知道在哪里。response.Write 行永远不会返回(并且客户端没有收到任何内容)。

2)如果我写入队列,由于某种原因连接被丢弃。如果我在等待延迟之后在该行上放置一个断点,则该行永远不会执行(而我的逻辑另有说明:))。如果我取消令牌,延迟任务应该退出,但似乎它正在中止整个处理程序?

4

0 回答 0