在Netty客户端上发送多个异步请求

首先,让我解释一下背景:

我必须创建一个客户端,它将发送许多HTTP请求来下载图像。 这些请求必须是异步的,因为只要图像完成,它就会被添加到队列然后打印到屏幕。 因为图像可能很大并且响应被分块,我的处理程序必须将它聚合到缓冲区中。

所以我按照Netty示例代码( HTTP勺子示例 )。

目前,我有三个静态Map来存储每个通道的通道ID和缓冲区/块布尔/我的最终对象。

private static final ConcurrentHashMap BUFFER_MAP = new ConcurrentHashMap(); private static final ConcurrentHashMap PACK_MAP = new ConcurrentHashMap(); private static final ConcurrentHashMap CHUNKS_MAP = new ConcurrentHashMap(); 

之后,我创建了我的引导程序客户端,并计数到countDown挂起的请求数。 当响应图像为complet时,最终队列和计数器将传递给我的Handler。

  final ClientBootstrap bootstrap = new ClientBootstrap( new NioClientSocketChannelFactory( Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); bootstrap.setOption("keepAlive", true); bootstrap.setOption("tcpNoDelay", true); bootstrap.setOption("reuseAddress", true); bootstrap.setOption("connectTimeoutMillis", 30000); final CountDownLatch latch = new CountDownLatch(downloadList.size()) { @Override public void countDown() { super.countDown(); if (getCount() <= 0) { try { queue.put(END_OF_QUEUE); bootstrap.releaseExternalResources(); } catch (InterruptedException ex) { LOGGER.log(Level.WARNING, ex.getMessage(), ex); } } } }; bootstrap.getPipeline().addLast("codec", new HttpClientCodec()); bootstrap.getPipeline().addLast("handler", new TileClientHandler(queue, latch)); 

之后,我为每个要下载的图像创建一个频道,当频道连接时,将创建并发送请求。 之前已经提取了主机和端口。

 for (final ImagePack pack : downloadList) { final ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port)); future.addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture cf) throws Exception { final Channel channel = future.getChannel(); PACK_MAP.put(channel.getId(), pack); final HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, pack.url); request.setHeader(HttpHeaders.Names.HOST, host); request.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE); request.setHeader(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.BYTES); if (channel.isWritable()) { channel.write(request); } } }); } 

现在,这是我的ChannelHandler,它是一个扩展SimpleChannelUpstreamHandler的内部类。 连接通道后,将创建BUFFER_MAPCHUNKS_MAP的新条目。 BUFFER_MAP包含处理程序用于聚合来自通道的图像块的所有图像缓冲区,而CHUNKS_MAP包含响应CHUNKS_MAP布尔值。 响应完成后,图像InputSteam将添加到队列中,锁存器倒计时并关闭通道。

 private class TileClientHandler extends SimpleChannelUpstreamHandler { private CancellableQueue queue; private CountDownLatch latch; public TileClientHandler(final CancellableQueue queue, final CountDownLatch latch) { this.queue = queue; this.latch = latch; } @Override public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { if(!BUFFER_MAP.contains(ctx.getChannel().getId())){ BUFFER_MAP.put(ctx.getChannel().getId(), new DynamicChannelBuffer(50000)); } if(!CHUNKS_MAP.contains(ctx.getChannel().getId())){ CHUNKS_MAP.put(ctx.getChannel().getId(), false); } } @Override public void writeComplete(ChannelHandlerContext ctx, WriteCompletionEvent e) throws Exception { super.writeComplete(ctx, e); if(!BUFFER_MAP.contains(ctx.getChannel().getId())){ BUFFER_MAP.put(ctx.getChannel().getId(), new DynamicChannelBuffer(50000)); } if(!CHUNKS_MAP.contains(ctx.getChannel().getId())){ CHUNKS_MAP.put(ctx.getChannel().getId(), false); } } @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { final Integer channelID = ctx.getChannel().getId(); if (!CHUNKS_MAP.get(channelID)) { final HttpResponse response = (HttpResponse) e.getMessage(); if (response.isChunked()) { CHUNKS_MAP.put(channelID, true); } else { final ChannelBuffer content = response.getContent(); if (content.readable()) { final ChannelBuffer buf = BUFFER_MAP.get(channelID); buf.writeBytes(content); BUFFER_MAP.put(channelID, buf); messageCompleted(e); } } } else { final HttpChunk chunk = (HttpChunk) e.getMessage(); if (chunk.isLast()) { CHUNKS_MAP.put(channelID, false); messageCompleted(e); } else { final ChannelBuffer buf = BUFFER_MAP.get(channelID); buf.writeBytes(chunk.getContent()); BUFFER_MAP.put(channelID, buf); } } } @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { e.getCause().printStackTrace(); latch.countDown(); e.getChannel().close(); } private void messageCompleted(MessageEvent e) { final Integer channelID = e.getChannel().getId(); if (queue.isCancelled()) { return; } try { final ImagePack p = PACK_MAP.get(channelID); final ChannelBuffer b = BUFFER_MAP.get(channelID); p.setBuffer(new ByteArrayInputStream(b.array())); queue.put(p.getTile()); } catch (Exception ex) { LOGGER.log(Level.WARNING, ex.getMessage(), ex); } latch.countDown(); e.getChannel().close(); } } 

我的问题是,当我执行这段代码时,我有以下例外:

  java.lang.IllegalArgumentException: invalid version format: 3!}@ at org.jboss.netty.handler.codec.http.HttpVersion.(HttpVersion.java:108) at org.jboss.netty.handler.codec.http.HttpVersion.valueOf(HttpVersion.java:68) at org.jboss.netty.handler.codec.http.HttpResponseDecoder.createMessage(HttpResponseDecoder.java:110) at org.jboss.netty.handler.codec.http.HttpMessageDecoder.decode(HttpMessageDecoder.java:198) at org.jboss.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:113) at org.jboss.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:101) at org.jboss.netty.handler.codec.replay.ReplayingDecoder.callDecode(ReplayingDecoder.java:470) at org.jboss.netty.handler.codec.replay.ReplayingDecoder.messageReceived(ReplayingDecoder.java:443) at org.jboss.netty.handler.codec.http.HttpClientCodec.handleUpstream(HttpClientCodec.java:77) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:274) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:261) at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:351) at org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:282) at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:202) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) at java.lang.Thread.run(Thread.java:662) java.lang.IllegalArgumentException: invalid version format: at org.jboss.netty.handler.codec.http.HttpVersion.(HttpVersion.java:108) at org.jboss.netty.handler.codec.http.HttpVersion.valueOf(HttpVersion.java:68) at org.jboss.netty.handler.codec.http.HttpResponseDecoder.createMessage(HttpResponseDecoder.java:110) at org.jboss.netty.handler.codec.http.HttpMessageDecoder.decode(HttpMessageDecoder.java:198) at org.jboss.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:113) at org.jboss.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:101) at org.jboss.netty.handler.codec.replay.ReplayingDecoder.callDecode(ReplayingDecoder.java:470) at org.jboss.netty.handler.codec.replay.ReplayingDecoder.cleanup(ReplayingDecoder.java:546) at org.jboss.netty.handler.codec.replay.ReplayingDecoder.channelDisconnected(ReplayingDecoder.java:449) at org.jboss.netty.handler.codec.http.HttpClientCodec.handleUpstream(HttpClientCodec.java:77) at org.jboss.netty.channel.Channels.fireChannelDisconnected(Channels.java:360) at org.jboss.netty.channel.socket.nio.NioWorker.close(NioWorker.java:595) at org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink.eventSunk(NioClientSocketPipelineSink.java:101) at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:60) at org.jboss.netty.handler.codec.http.HttpClientCodec.handleDownstream(HttpClientCodec.java:82) at org.jboss.netty.channel.Channels.close(Channels.java:720) at org.jboss.netty.channel.AbstractChannel.close(AbstractChannel.java:200) at org.geotoolkit.client.map.CachedPyramidSet$TileClientHandler.exceptionCaught(CachedPyramidSet.java:515) at org.jboss.netty.handler.codec.replay.ReplayingDecoder.exceptionCaught(ReplayingDecoder.java:461) at org.jboss.netty.handler.codec.http.HttpClientCodec.handleUpstream(HttpClientCodec.java:77) at org.jboss.netty.channel.Channels.fireExceptionCaught(Channels.java:432) at org.jboss.netty.channel.AbstractChannelSink.exceptionCaught(AbstractChannelSink.java:52) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:274) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:261) at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:351) at org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:282) at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:202) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) at java.lang.Thread.run(Thread.java:662) 22 mars 2012 15:27:31 org.jboss.netty.channel.DefaultChannelPipeline ATTENTION: An exception was thrown by a user handler while handling an exception event ([id: 0x3cd16610, /172.16.30.91:34315 :> tile.openstreetmap.org/193.63.75.98:80] EXCEPTION: java.lang.IllegalArgumentException: invalid version format: java.lang.IllegalStateException: An Executor cannot be shut down from the thread acquired from itself. Please make sure you are not calling releaseExternalResources() from an I/O worker thread. at org.jboss.netty.util.internal.ExecutorUtil.terminate(ExecutorUtil.java:71) at org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory.releaseExternalResources(NioClientSocketChannelFactory.java:171) at org.jboss.netty.bootstrap.Bootstrap.releaseExternalResources(Bootstrap.java:324) at org.geotoolkit.client.map.CachedPyramidSet$1.countDown(CachedPyramidSet.java:314) at org.geotoolkit.client.map.CachedPyramidSet$TileClientHandler.exceptionCaught(CachedPyramidSet.java:514) at org.jboss.netty.handler.codec.replay.ReplayingDecoder.exceptionCaught(ReplayingDecoder.java:461) at org.jboss.netty.handler.codec.http.HttpClientCodec.handleUpstream(HttpClientCodec.java:77) at org.jboss.netty.channel.Channels.fireExceptionCaught(Channels.java:432) at org.jboss.netty.channel.AbstractChannelSink.exceptionCaught(AbstractChannelSink.java:52) at org.jboss.netty.channel.Channels.fireChannelDisconnected(Channels.java:360) at org.jboss.netty.channel.socket.nio.NioWorker.close(NioWorker.java:595) at org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink.eventSunk(NioClientSocketPipelineSink.java:101) at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:60) at org.jboss.netty.handler.codec.http.HttpClientCodec.handleDownstream(HttpClientCodec.java:82) at org.jboss.netty.channel.Channels.close(Channels.java:720) at org.jboss.netty.channel.AbstractChannel.close(AbstractChannel.java:200) at org.geotoolkit.client.map.CachedPyramidSet$TileClientHandler.exceptionCaught(CachedPyramidSet.java:515) at org.jboss.netty.handler.codec.replay.ReplayingDecoder.exceptionCaught(ReplayingDecoder.java:461) at org.jboss.netty.handler.codec.http.HttpClientCodec.handleUpstream(HttpClientCodec.java:77) at org.jboss.netty.channel.Channels.fireExceptionCaught(Channels.java:432) at org.jboss.netty.channel.AbstractChannelSink.exceptionCaught(AbstractChannelSink.java:52) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:274) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:261) at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:351) at org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:282) at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:202) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) at java.lang.Thread.run(Thread.java:662) 

还有一些NPE出现了。

 java.lang.NullPointerException at org.jboss.netty.handler.codec.http.HttpMessageDecoder.skipControlCharacters(HttpMessageDecoder.java:409) at org.jboss.netty.handler.codec.http.HttpMessageDecoder.decode(HttpMessageDecoder.java:184) at org.jboss.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:113) at org.jboss.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:101) at org.jboss.netty.handler.codec.replay.ReplayingDecoder.callDecode(ReplayingDecoder.java:470) at org.jboss.netty.handler.codec.replay.ReplayingDecoder.messageReceived(ReplayingDecoder.java:443) at org.jboss.netty.handler.codec.http.HttpClientCodec.handleUpstream(HttpClientCodec.java:77) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:274) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:261) at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:351) at org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:282) at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:202) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) 

所有这些代码都适用于一个请求,但是当许多请求发送时,一些奇怪的东西会附加到缓冲区。

我在这里缺少什么想法? 谢谢。

在我的第一个版本中,我为每个请求的图像复制bootstrap / handler,它工作正常但不是很优化。

问题是你在所有频道之间共享一个HttpClientCodec。 将为所有通道克隆引导程序中指定的默认管道,因此每个通道都会看到每个处理程序的相同实例。 http编解码器是有状态的,因此您可以看到不同响应的效果混合在一起。

最简单的解决方案是将ChannelPipelineFactory传递给引导程序。 这将针对每个新通道调用,您可以使用HttpClientCodec的新实例创建管道。 没有什么可以阻止你为你创建的每个管道使用相同的TileClientHandler实例,如果这是它的工作方式。

我很好奇。 鉴于您同时发出每个请求,在HttpClientCodec上游添加HttpChunkAggregator并让Netty将所有块聚合成单个HttpResponse会不会更容易。 然后你只是从那里抓取重新组装的内容?