1

我正在使用球衣来实现 SSE 场景。

服务器保持连接处于活动状态。并定期向客户端推送数据。

在我的场景中,有一个连接限制,只有一定数量的客户端可以同时订阅服务器。

因此,当新客户端尝试订阅时,我会检查(EventOutput.isClosed) 以查看是否有任何旧连接不再处于活动状态,以便它们可以为新连接腾出空间。

但是EventOutput.isClosed的结果总是假的,除非客户端显式调用EventSource的close。这意味着如果客户端意外掉线(断电或互联网中断),它仍然会占用连接,并且新客户端无法订阅。

有解决办法吗?

4

2 回答 2

6

@崔鹏飞,

因此,在我试图自己寻找答案的旅途中,我偶然发现了一个存储库,该存储库解释了如何优雅地清理来自断开连接的客户端的连接。

将所有 SSE EventOutput 逻辑封装到服务/管理器中。在这种情况下,他们启动了一个线程来检查 EventOutput 是否已被客户端关闭。如果是这样,他们会正式关闭连接(EventOutput#close())。如果不是,他们会尝试写入流。如果它抛出一个异常,那么客户端在没有关闭的情况下断开连接,它会处理关闭它。如果写入成功,则 EventOutput 将返回到池中,因为它仍然是活动连接。

回购(和实际课程)可在此处获得。我还在下面包含了没有导入的类,以防回购被删除。

请注意,他们将此绑定到单例。商店应该是全球唯一的。

public class SseWriteManager {

private final ConcurrentHashMap<String, EventOutput> connectionMap = new ConcurrentHashMap<>();

private final ScheduledExecutorService messageExecutorService;

private final Logger logger = LoggerFactory.getLogger(SseWriteManager.class);

public SseWriteManager() {
    messageExecutorService = Executors.newScheduledThreadPool(1);
    messageExecutorService.scheduleWithFixedDelay(new messageProcessor(), 0, 5, TimeUnit.SECONDS);
}

public void addSseConnection(String id, EventOutput eventOutput) {
    logger.info("adding connection for id={}.", id);
    connectionMap.put(id, eventOutput);
}

private class messageProcessor implements Runnable {
    @Override
    public void run() {
        try {
            Iterator<Map.Entry<String, EventOutput>> iterator = connectionMap.entrySet().iterator();
            while (iterator.hasNext()) {
                boolean remove = false;
                Map.Entry<String, EventOutput> entry = iterator.next();
                EventOutput eventOutput = entry.getValue();
                if (eventOutput != null) {
                    if (eventOutput.isClosed()) {
                        remove = true;
                    } else {
                        try {
                            logger.info("writing to id={}.", entry.getKey());
                            eventOutput.write(new OutboundEvent.Builder().name("custom-message").data(String.class, "EOM").build());
                        } catch (Exception ex) {
                            logger.info(String.format("write failed to id=%s.", entry.getKey()), ex);
                            remove = true;
                        }
                    }
                }
                if (remove) {
                    // we are removing the eventOutput. close it is if it not already closed.
                    if (!eventOutput.isClosed()) {
                        try {
                            eventOutput.close();
                        } catch (Exception ex) {
                            // do nothing.
                        }
                    }
                    iterator.remove();
                }
            }
        } catch (Exception ex) {
            logger.error("messageProcessor.run threw exception.", ex);
        }
    }
}

public void shutdown() {
    if (messageExecutorService != null && !messageExecutorService.isShutdown()) {
        logger.info("SseWriteManager.shutdown: calling messageExecutorService.shutdown.");
        messageExecutorService.shutdown();
    } else {
        logger.info("SseWriteManager.shutdown: messageExecutorService == null || messageExecutorService.isShutdown().");
    }

}} 
于 2015-11-20T03:35:32.763 回答
2

想提供有关此的更新:

发生的事情是客户端(js)上的 eventSource 从未进入 readyState '1',除非我们在添加新订阅后立即进行广播。即使在这种状态下,客户端也可以接收从服务器推送的数据。添加调用来广播简单的“OK”消息有助于将 eventSource 踢到 readyState 1。

从客户端关闭连接;要主动清理资源,仅在客户端关闭 eventSource 无济于事。我们必须再次对服务器进行 ajax 调用以强制服务器进行广播。当强制广播时,jersey 会清理不再活跃的连接,并依次释放资源(CLOSE_WAIT 中的连接)。如果没有,连接将停留在 CLOSE_WAIT 中,直到下一次广播发生。

于 2015-05-25T20:56:54.490 回答