使用Netty的异步HTTP客户端

我是netty的新手,仍然喜欢找我的路。 我正在寻找创建一个异步工作的http客户端。 http的netty示例仅显示如何等待IO操作,而不是如何使用addListener ,因此我一直试图在过去几天解决这个问题。

我正在尝试创建一个请求类,它将处理请求的所有不同状态,包括连接,发送数据,处理响应,然后关闭连接。 为了做到这一点,我的类扩展了SimpleChannelUpstreamHandler并实现了ChannelFutureListener 。 我使用ChannelPipelineFactory将类(作为SimpleChannelUpstreamHandler )的(this)实例作为处理程序添加到管道。

连接创建如下:

this.state = State.Connecting; this.clientBootstrap.connect(this.address).addListener(this); 

然后是operationComplete方法:

 @Override public void operationComplete(ChannelFuture future) throws Exception { State oldState = this.state; if (!future.isSuccess()) { this.status = Status.Failed; future.getChannel().disconnect().addListener(this); } else if (future.isCancelled()) { this.status = Status.Canceled; future.getChannel().disconnect().addListener(this); } else switch (this.state) { case Connecting: this.state = State.Sending; Channel channel = future.getChannel(); channel.write(this.createRequest()).addListener(this); break; case Sending: this.state = State.Disconnecting; future.getChannel().disconnect().addListener(this); break; case Disconnecting: this.state = State.Closing; future.getChannel().close().addListener(this); break; case Closing: this.state = State.Finished; break; } System.out.println("request operationComplete start state: " + oldState + ", end state: " + this.state + ", status: " + this.status); } private HttpRequest createRequest() { String url = this.url.toString(); HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, url); request.setHeader(HttpHeaders.Names.HOST, this.url.getHost()); request.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE); request.setHeader(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP); return request; } 

该类还会覆盖messageReceived方法:

 @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { System.out.println("messageReceived"); HttpResponse response = (HttpResponse) e.getMessage(); ChannelBuffer content = response.getContent(); if (content.readable()) { System.out.println("CONTENT: " + content.toString(CharsetUtil.UTF_8)); } } 

问题是我得到了这个输出:

 request operationComplete start state: Connecting, end state: Sending, status: Unknown request operationComplete start state: Sending, end state: Disconnecting, status: Unknown request operationComplete start state: Closing, end state: Finished, status: Unknown request operationComplete start state: Disconnecting, end state: Finished, status: Unknown 

正如您所看到的,由于某种原因,即使管道工厂将此类的实例添加到管道中,仍未执行messageReceived

我在这里缺少什么想法? 谢谢。


编辑

在@JestanNirojan的帮助下,我终于得到了这个工作,以防有人对该解决方案感兴趣:

 public class ClientRequest extends SimpleChannelUpstreamHandler { .... public void connect() { this.state = State.Connecting; System.out.println(this.state); this.clientBootstrap.connect(this.address); } @Override public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { this.state = State.Sending; System.out.println(this.state); ctx.getChannel().write(this.createRequest()); } @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { HttpResponse response = (HttpResponse) e.getMessage(); ChannelBuffer content = response.getContent(); if (content.readable()) { System.out.println("CONTENT: " + content.toString(CharsetUtil.UTF_8)); } this.state = State.Disconnecting; System.out.println(this.state); } @Override public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { this.state = State.Closing; System.out.println(this.state); } @Override public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { this.state = State.Finished; System.out.println(this.state); } private HttpRequest createRequest() { String url = this.url.toString(); HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, url); request.setHeader(HttpHeaders.Names.HOST, this.url.getHost()); request.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE); request.setHeader(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP); return request; } } 

您正在使用ChannelFutureListener来执行通道中的所有操作(这是错误的),并且将在调用这些通道操作后立即执行将来的侦听器。

问题是,在发送消息后,通道立即断开,处理程序无法接收稍后出现的响应消息。

  ........ case Sending: this.state = State.Disconnecting; future.getChannel().disconnect().addListener(this); break; ........ 

你不应该阻止频道未来的线程。 最好的方法是扩展SimpleChannelUpstreamHandler

  channelConnected(..) {} messageReceived(..) {} channelDisconnected(..) {} 

方法并对这些事件做出反应。 你也可以把状态保留在那个处理程序中。