如何正确使用ChunkedStream
这是我的用例…我有一个上游服务,通过网络发送我的Netty应用程序数据,并且该数据需要发布到连接到Netty的多个客户端。 推送到客户端的数据必须是HTTP“Transfer-Encoding:chunked”。
我找到了ChunkedStream
,虽然也许我可以创建一个PipedInputStream
和一个PipedOutputStream
(连接到PipedInputStream
)并将ChunkedStream
写入通道。 然后,当从我的上游服务收到数据时,我可以将数据写入通道的PipedOutputStream
,然后将其发送给客户端:
在channelConnected中
PipedInputStream in = new PipedInputStream(); PipedOutputStream out = new PipedOutputStream(in); ctx.getChannel().write( new PersistentChunkedStream(in) );
单独的线程将数据发布到连接的通道
ChannelBuffer buff = ChannelBuffers.copiedBuffer("FOO",CharsetUtil.UTF_8); out.write( buff.array() ); channel.get(ChunkedWriteHandler.class).resumeTransfer();
如果有0个字节可用,我必须扩展ChunkedStream
以从nextChunk
返回null
(在没有线程挂起的情况下“挂起”写入),所以在写入相关通道的PipedOutputStream
之后我调用resumeTransfer
。 当我调试并逐步执行代码时,我可以看到正在调用ChunkedWriteHandler
flush
,它会调用:
Channels.write(ctx, writeFuture, chunk, currentEvent.getRemoteAddress());
我写入PipedOutputStream,
的字节数PipedOutputStream,
但它从未被客户端接收过。
HTTPcurl
~ $ curl -vN http://localhost:8080/stream * About to connect() to localhost port 8080 (#0) * Trying 127.0.0.1... connected * Connected to localhost (127.0.0.1) port 8080 (#0) > GET /stream HTTP/1.1 > User-Agent: curl/7.19.7 (universal-apple-darwin10.0) libcurl/7.19.7 OpenSSL/0.9.8r zlib/1.2.3 > Host: localhost:8080 > Accept: */* > < HTTP/1.1 200 OK < Transfer-Encoding: chunked < ### NOTE: NO "FOO" TRANSMIT BACK ###
有什么想法吗? 也许有更好的方法来实现这一目标?
我想知道为什么你甚至想要使用PipedInputStream / PipedOutputStream。 我认为在没有您的数据的情况下直接调用Channel.write(..)会更干净/更容易。 请注意尽可能多地在Channel.write(..)中提交数据,因为这是一项昂贵的操作。
您可以从任何您想要的线程调用Channel.write(..),因为它是线程安全的。
只是为Norman提供的答案添加更多内容。
发送任意分块数据时,必须先发送一个新的DefaultHttpResponse(仅限一次):
HttpResponse res = new DefaultHttpResponse(); res.setChunked(true); res.setHeader(Names.TRANSFER_ENCODING, Values.CHUNKED); channel.write(res);
然后,只要您想要使用任意块写入通道,请调用:
HttpChunk chunk = new DefaultHttpChunk(ChannelBuffers.wrappedBuffer(str.getBytes(CharsetUtil.UTF_8))); channel.write(chunk);
我知道这是一个老问题,但希望这有助于某人。
ChunkedStream并不意味着HTTP Chunking ……这是一个令人遗憾的命名冲突,我能说清楚。 分块流只是为了避免将整个项目加载到内存中,实际上ChunkedWriter会在每个块之后调用ChunkedStream来请求更多数据。
事实certificate,您可以使用ChunkedStream范例创建一些从标准输入流为您执行HTTP分块的内容。 下面的代码实现了ChunkedInput并接受了一个InputStream。 它还会自动附加尾随的http块以指示EOF,但是根据ChunkedInput规范只会这样做一次。
public class HttpChunkStream implements ChunkedInput { private static final int CHUNK_SIZE = 8192; boolean eof = false; InputStream data; HttpChunkStream (InputStream data) { this.data= data; } byte[] buf = new byte[CHUNK_SIZE]; @Override public Object nextChunk() throws Exception { if (eof) return null; int b = data.read(buf); if (b==-1) { eof=true; return new DefaultHttpChunk(ChannelBuffers.EMPTY_BUFFER); } DefaultHttpChunk c = new DefaultHttpChunk(ChannelBuffers.wrappedBuffer(buf,0,b)); return c; } @Override public boolean isEndOfInput() throws Exception { return eof; } @Override public boolean hasNextChunk() throws Exception { return isEndOfInput()==false; } @Override public void close() throws Exception { Closeables.closeQuietly(data); }
}