如何正确使用ChunkedStream

这是我的用例…我有一个上游服务,通过网络发送我的Netty应用程序数据,并且该数据需要发布到连接到Netty的多个客户端。 推送到客户端的数据必须是HTTP“Transfer-Encoding:chunked”。

我找到了ChunkedStream ,虽然也许我可以创建一个PipedInputStream和一个PipedOutputStream (连接到PipedInputStream )并将ChunkedStream写入通道。 然后,当从我的上游服务收到数据时,我可以将数据写入通道的PipedOutputStream ,然后将其发送给客户端:

在channelConnected中

 PipedInputStream in = new PipedInputStream(); PipedOutputStream out = new PipedOutputStream(in); ctx.getChannel().write( new PersistentChunkedStream(in) ); 

单独的线程将数据发布到连接的通道

 ChannelBuffer buff = ChannelBuffers.copiedBuffer("FOO",CharsetUtil.UTF_8); out.write( buff.array() ); channel.get(ChunkedWriteHandler.class).resumeTransfer(); 

如果有0个字节可用,我必须扩展ChunkedStream以从nextChunk返回null (在没有线程挂起的情况下“挂起”写入),所以在写入相关通道的PipedOutputStream之后我调用resumeTransfer 。 当我调试并逐步执行代码时,我可以看到正在调用ChunkedWriteHandler flush ,它会调用:

 Channels.write(ctx, writeFuture, chunk, currentEvent.getRemoteAddress()); 

我写入PipedOutputStream,的字节数PipedOutputStream,但它从未被客户端接收过。

HTTPcurl

 ~ $ curl -vN http://localhost:8080/stream * About to connect() to localhost port 8080 (#0) * Trying 127.0.0.1... connected * Connected to localhost (127.0.0.1) port 8080 (#0) > GET /stream HTTP/1.1 > User-Agent: curl/7.19.7 (universal-apple-darwin10.0) libcurl/7.19.7 OpenSSL/0.9.8r zlib/1.2.3 > Host: localhost:8080 > Accept: */* > < HTTP/1.1 200 OK < Transfer-Encoding: chunked < ### NOTE: NO "FOO" TRANSMIT BACK ### 

有什么想法吗? 也许有更好的方法来实现这一目标?

我想知道为什么你甚至想要使用PipedInputStream / PipedOutputStream。 我认为在没有您的数据的情况下直接调用Channel.write(..)会更干净/更容易。 请注意尽可能多地在Channel.write(..)中提交数据,因为这是一项昂贵的操作。

您可以从任何您想要的线程调用Channel.write(..),因为它是线程安全的。

只是为Norman提供的答案添加更多内容。

发送任意分块数据时,必须先发送一个新的DefaultHttpResponse(仅限一次):

 HttpResponse res = new DefaultHttpResponse(); res.setChunked(true); res.setHeader(Names.TRANSFER_ENCODING, Values.CHUNKED); channel.write(res); 

然后,只要您想要使用任意块写入通道,请调用:

 HttpChunk chunk = new DefaultHttpChunk(ChannelBuffers.wrappedBuffer(str.getBytes(CharsetUtil.UTF_8))); channel.write(chunk); 

我知道这是一个老问题,但希望这有助于某人。

ChunkedStream并不意味着HTTP Chunking ……这是一个令人遗憾的命名冲突,我能说清楚。 分块流只是为了避免将整个项目加载到内存中,实际上ChunkedWriter会在每个块之后调用ChunkedStream来请求更多数据。

事实certificate,您可以使用ChunkedStream范例创建一些从标准输入流为您执行HTTP分块的内容。 下面的代码实现了ChunkedInput并接受了一个InputStream。 它还会自动附加尾随的http块以指示EOF,但是根据ChunkedInput规范只会这样做一次。

 public class HttpChunkStream implements ChunkedInput { private static final int CHUNK_SIZE = 8192; boolean eof = false; InputStream data; HttpChunkStream (InputStream data) { this.data= data; } byte[] buf = new byte[CHUNK_SIZE]; @Override public Object nextChunk() throws Exception { if (eof) return null; int b = data.read(buf); if (b==-1) { eof=true; return new DefaultHttpChunk(ChannelBuffers.EMPTY_BUFFER); } DefaultHttpChunk c = new DefaultHttpChunk(ChannelBuffers.wrappedBuffer(buf,0,b)); return c; } @Override public boolean isEndOfInput() throws Exception { return eof; } @Override public boolean hasNextChunk() throws Exception { return isEndOfInput()==false; } @Override public void close() throws Exception { Closeables.closeQuietly(data); } 

}