一旦队列填满,WebSocket异步发送可能导致阻止发送

我有非常简单的基于Jetty的websockets服务器,负责流式传输小二进制消息以连接客户端。

为避免在服务器端出现任何阻塞,我使用的是sendBytesByFuture方法。

在将负载从2个客户端增加到20个后,它们将停止接收任何数据。 在故障排除期间,我决定打开同步发送方法,最后得到了可能的原因:

java.lang.IllegalStateException: Blocking message pending 10000 for BLOCKING at org.eclipse.jetty.websocket.common.WebSocketRemoteEndpoint.lockMsg(WebSocketRemoteEndpoint.java:130) at org.eclipse.jetty.websocket.common.WebSocketRemoteEndpoint.sendBytes(WebSocketRemoteEndpoint.java:244) 

客户端在接收数据时不进行任何计算,因此可能无法成为缓慢的加入者。

所以我想知道我该怎么做才能解决这个问题? (使用Jetty 9.2.3)

如果同步发送出现错误消息,则您有多个线程尝试在同一RemoteEndpoint上发送消息 – 这是协议不允许的。 一次只能发送一条消息。 (基本上没有同步发送的队列)

如果异步发送出现错误消息,则表示您在队列中等待发送消息,但您仍在尝试编写更多异步消息。

尽量不要同时混合同步和异步(很容易意外地将输出变成无效的协议流)

使用Java Futures:

您将需要使用sendBytesByFuture()sendStringByFuture()方法返回时提供的Future对象来validation消息是否实际已发送(可能是错误),并且如果有足够的开始排队在远程端点可以赶上之前发送更多消息后,你没有回复。

标准的Future行为和技术适用于此处。

使用Jetty回调:

sendBytes(ByteBuffer,WriteCallback)sendString(String,WriteCallback)方法中也有WriteCallback行为sendString(String,WriteCallback)它们会在成功/错误时调用您自己的代码,您可以在其中放置一些关于您发送的内容的逻辑(限制它,发送)它更慢,排队,过滤它,丢弃一些消息,优先处理消息等等,无论你需要什么)

使用阻止:

或者你可以使用阻塞发送来永远不会有太多的消息排队。