服务器与Jersey发送事件:客户端丢弃后,EventOutput未关闭

我正在使用jersey来实施SSE场景。

服务器保持连接活动。 并定期将数据推送给客户。

在我的方案中,存在连接限制,只有一定数量的客户端可以同时订阅服务器。

因此,当新客户端尝试订阅时,我会进行检查(EventOutput.isClosed)以查看是否有任何旧连接不再处于活动状态,因此可以为新连接腾出空间。

但EventOutput.isClosed的结果始终为false,除非客户端显式调用EventSource的close。 这意味着如果客户端意外掉线(断电或互联网切断),它仍然会占用连接,新客户端无法订阅。

有没有解决这个问题?

@CuiPengFei,

所以在我的旅行中试图找到自己的答案我偶然发现了一个存储库,它解释了如何正常处理来自断开连接的客户端的连接。

将所有SSE EventOutput逻辑封装到Service / Manager中。 在这里,他们启动一个线程,检查客户端是否已关闭EventOutput。 如果是这样,他们正式关闭连接(EventOutput#close())。 如果不是,他们会尝试写入流。 如果它抛出一个Exception,那么客户端在没有关闭的情况下断开连接并处理它的关闭。 如果写入成功,则EventOutput将返回到池,因为它仍然是活动连接。

回购(和实际类)可在此处获得 。 如果删除了回购,我还包括没有import的类。

请注意,它们将此绑定到Singleton。 商店应该是全球唯一的。

public class SseWriteManager { private final ConcurrentHashMap connectionMap = new ConcurrentHashMap<>(); private final ScheduledExecutorService messageExecutorService; private final Logger logger = LoggerFactory.getLogger(SseWriteManager.class); public SseWriteManager() { messageExecutorService = Executors.newScheduledThreadPool(1); messageExecutorService.scheduleWithFixedDelay(new messageProcessor(), 0, 5, TimeUnit.SECONDS); } public void addSseConnection(String id, EventOutput eventOutput) { logger.info("adding connection for id={}.", id); connectionMap.put(id, eventOutput); } private class messageProcessor implements Runnable { @Override public void run() { try { Iterator> iterator = connectionMap.entrySet().iterator(); while (iterator.hasNext()) { boolean remove = false; Map.Entry entry = iterator.next(); EventOutput eventOutput = entry.getValue(); if (eventOutput != null) { if (eventOutput.isClosed()) { remove = true; } else { try { logger.info("writing to id={}.", entry.getKey()); eventOutput.write(new OutboundEvent.Builder().name("custom-message").data(String.class, "EOM").build()); } catch (Exception ex) { logger.info(String.format("write failed to id=%s.", entry.getKey()), ex); remove = true; } } } if (remove) { // we are removing the eventOutput. close it is if it not already closed. if (!eventOutput.isClosed()) { try { eventOutput.close(); } catch (Exception ex) { // do nothing. } } iterator.remove(); } } } catch (Exception ex) { logger.error("messageProcessor.run threw exception.", ex); } } } public void shutdown() { if (messageExecutorService != null && !messageExecutorService.isShutdown()) { logger.info("SseWriteManager.shutdown: calling messageExecutorService.shutdown."); messageExecutorService.shutdown(); } else { logger.info("SseWriteManager.shutdown: messageExecutorService == null || messageExecutorService.isShutdown()."); } }} 

想要提供这方面的最新消息:

发生的事情是客户端(js)上的eventSource从未进入readyState“1”,除非我们在添加新订阅后立即进行广播。 即使在此状态下,客户端也可以接收从服务器推送的数据。 添加调用以播放简单的“OK”消息有助于将eventSource踢入readyState 1。

从客户端关闭连接; 要积极清理资源,只需在客户端关闭eventSource就无济于事了。 我们必须对服务器进行另一次ajax调用以强制服务器进行广播。 当强制广播时,jersey将清理不再存在的连接,并将依次释放资源(CLOSE_WAIT中的连接)。 如果没有连接将在CLOSE_WAIT中停留,直到下一个广播发生。