Jersey SSE – eventOutput.write在发送第一条消息后抛出nullpointer

我使用Jersey实现了一个Restful Web界面,用于通过HTTP将从内部JMS发布者收到的消息发送到外部客户端。 我已经设法将测试消息发送到Java客户端,但线程在完成write()执行,关闭连接并阻止进一步通信之前抛出空指针exception。

这是我的资源类:

@GET @Path("/stream_data") @Produces(SseFeature.SERVER_SENT_EVENTS) public EventOutput getServerSentEvents(@Context ServletContext context){ final EventOutput eventOutput = new EventOutput(); new Thread( new ObserverThread(eventOutput, (MService) context.getAttribute("instance")) ).start(); return eventOutput; } 

这是我的线程的运行方法:

 public class ObserverThread implements Observer, Runnable { //constructor sets eventOutput & mService objects //mService notifyObservers() called when JMS message received //text added to Thread's message queue to await sending to client public void run() { try { String message = "{'symbol':'test','entryType'='0','price'='test'}"; Thread.sleep(1000); OutboundEvent.Builder builder = new OutboundEvent.Builder(); builder.mediaType(MediaType.APPLICATION_JSON_TYPE); builder.data(String.class, message); OutboundEvent event = builder.build(); eventOutput.write(event); System.out.println(">>>>>>SSE CLIENT HAS BEEN REGISTERED!"); mService.addObserver(this); while(!eventOutput.isClosed()){ if(!updatesQ.isEmpty()){ pushUpdate(updatesQ.dequeue()); } } System.out.println("<<<<<<<SSE CLIENT HAS BEEN DEREGISTERED!"); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } 

这是我的客户端代码:

 Client client = ClientBuilder.newBuilder().register(SseFeature.class).build(); WebTarget target = client.target(url); EventInput eventInput = target.request().get(EventInput.class); try { while (!eventInput.isClosed()) { eventInput.setChunkType(MediaType.WILDCARD_TYPE); final InboundEvent inboundEvent = eventInput.read(); if (inboundEvent != null) { String theString = inboundEvent.readData(); System.out.println(theString + "\n"); } } } catch (Exception e) { e.printStackTrace(); } 

我得到“{‘符号’:’test’,’entryType’=’0’,’price’=’test’}”测试消息打印到客户端控制台,但服务器然后打印NullPointerException才能打印“>>>> SSE客户端注册”消息。 这将关闭连接,以便客户端退出while循环并停止侦听更新。

我将项目转换为webapp 3.0版本facet,以便向web.xml添加异步支持的标记,但我收到相同的空指针错误。 我倾向于认为它是由servlet在返回第一条消息后结束Request / Response对象引起的,证据显示在堆栈跟踪中:

 Exception in thread "Thread-20" java.lang.NullPointerException at org.apache.coyote.http11.InternalOutputBuffer.realWriteBytes(InternalOutputBuffer.java:741) at org.apache.tomcat.util.buf.ByteChunk.flushBuffer(ByteChunk.java:434) at org.apache.coyote.http11.InternalOutputBuffer.flush(InternalOutputBuffer.java:299) at org.apache.coyote.http11.Http11Processor.action(Http11Processor.java:981) at org.apache.coyote.Response.action(Response.java:183) at org.apache.catalina.connector.OutputBuffer.doFlush(OutputBuffer.java:314) at org.apache.catalina.connector.OutputBuffer.flush(OutputBuffer.java:288) at org.apache.catalina.connector.CoyoteOutputStream.flush(CoyoteOutputStream.java:98) at org.glassfish.jersey.message.internal.CommittingOutputStream.flush(CommittingOutputStream.java:292) at org.glassfish.jersey.server.ChunkedOutput$1.call(ChunkedOutput.java:241) at org.glassfish.jersey.server.ChunkedOutput$1.call(ChunkedOutput.java:192) at org.glassfish.jersey.internal.Errors.process(Errors.java:315) at org.glassfish.jersey.internal.Errors.process(Errors.java:242) at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:345) at org.glassfish.jersey.server.ChunkedOutput.flushQueue(ChunkedOutput.java:192) at org.glassfish.jersey.server.ChunkedOutput.write(ChunkedOutput.java:182) at com.bpc.services.service.ObserverThread.run(MarketObserverThread.java:32) at java.lang.Thread.run(Thread.java:745) <<<<<<<SSE CLIENT HAS BEEN DEREGISTERED! 

我也试图测试一个sse广播公司。 在这种情况下,我没有看到任何exception抛出,但是一旦收到第一条消息就关闭了连接,这让我相信它是servlet强制连接关闭的东西。 谁能告诉我如何在服务器端调试这个?

我在Jersey的@Context注入ExecutorService实例中似乎是一个长期存在的错误。 在他们目前的Sse (版本2.27)中,

 class JerseySse implements Sse { @Context private ExecutorService executorService; @Override public OutboundSseEvent.Builder newEventBuilder() { return new OutboundEvent.Builder(); } @Override public SseBroadcaster newBroadcaster() { return new JerseySseBroadcaster(executorService); } } 

从未初始化executorService字段,因此JerseySseBroadcaster在我的情况下引发了NullPointerException 。 我通过明确触发注入来解决这个问题。

如果你使用HK2作为CDI(泽西岛的默认值),上面问题解决方案的粗略草图可能类似于以下内容:

 @Singleton @Path("...") public class JmsPublisher { private Sse sse; private SseBroadcaster broadcaster; private final ExecutorService executor; private final BlockingQueue jmsMessageQueue; ... @Context public void setSse(Sse sse, ServiceLocator locator) { locator.inject(sse); // Inject sse.executorService this.sse = sse; this.broadcaster = sse.newBroadcaster(); } ... @GET @Path("/stream_data") @Produces(MediaType.SERVER_SENT_EVENTS) public void register(SseEventSink eventSink) { broadcaster.register(eventSink); } ... @PostConstruct private void postConstruct() { executor.submit(() -> { try { while(true) { String message = jmsMessageQueue.take(); broadcaster.broadcast(sse.newEventBuilder() .mediaType(MediaType.APPLICATION_JSON_TYPE) .data(String.class, message) .build()); } } catch(InterruptedException e) { Thread.currentThread().interrupt(); } }); } @PreDestroy private void preDestroy() { executor.shutdownNow(); } }