如何使用netty客户端获取服务器响应

我想写一个基于netty的客户端。 它应该有方法public String send(String msg); 它应该返回来自服务器或未来的响应 – 无所谓。 它也应该是multithreading的。 喜欢这个:

public class Client { public static void main(String[] args) throws InterruptedException { Client client = new Client(); } private Channel channel; public Client() throws InterruptedException { EventLoopGroup loopGroup = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(loopGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new StringDecoder()). addLast(new StringEncoder()). addLast(new ClientHandler()); } }); channel = b.connect("localhost", 9091).sync().channel(); } public String sendMessage(String msg) { channel.writeAndFlush(msg); return ??????????; } 

}

在调用writeAndFlush()之后,我怎么能从服务器检索响应? 我该怎么办?

我也使用Netty 4.0.18.Final

为方法返回Future很简单,我们将实现以下方法签名:

 public Futute sendMessage(String msg) { 

当您熟悉异步编程结构时,相对容易做到。 为了解决设计问题,我们将执行以下步骤:

  1. 写入消息时,将Promise添加到ArrayBlockingQueue

    这将作为最近发送的消息的列表,并允许我们更改Future对象返回结果。

  2. 当消息返回到处理程序时,将其解析为Queue的头部

    这使我们能够获得正确的未来变化。

  3. 更新Promise

    我们调用promise.setSuccess()来最终设置对象的状态,这将传播回future对象。

示例代码

 public class ClientHandler extends SimpleChannelInboundHandler { private ChannelHandlerContext ctx; private BlockingQueue> messageList = new ArrayBlockingQueue<>(16); @Override public void channelActive(ChannelHandlerContext ctx) { super.channelActive(ctx); this.ctx = ctx; } @Override public void channelInactive(ChannelHandlerContext ctx) { super.channelInactive(ctx); synchronized(this){ Promise prom; Exception err = null; while((prom = messageList.poll()) != null) prom.setFailure(err != null ? err : err = new IOException("Connection lost")); messageList = null; } } public Future sendMessage(String message) { if(ctx == null) throw new IllegalStateException(); return sendMessage(message, ctx.newPromise()); } public Future sendMessage(String message, Promise prom) { synchronized(this){ if(messageList == null) { // Connection closed prom.setFailure(new IllegalStateException()); } else if(messageList.offer(prom)) { // Connection open and message accepted ctx.writeAndFlush(message).addListener(); } else { // Connection open and message rejected prom.setFailure(new BufferOverflowException()); } return prom; } } @Override protected void messageReceived(ChannelHandlerContext ctx, String msg) { synchronized(this){ if(messageList != null) { messageList.poll().setSuccess(msg); } } } } 

文档细分

  • private ChannelHandlerContext ctx;

    用于存储我们对ChannelHandlerContext的引用,我们使用它来创建promises

  • private BlockingQueue> messageList = new ArrayBlockingQueue<>();

    我们将过去的消息保留在此列表中,以便我们可以更改未来的结果

  • public void channelActive(ChannelHandlerContext ctx)

    当连接变为活动状态时由netty调用。 在这里初始化变量。

  • public void channelInactive(ChannelHandlerContext ctx)

    当连接变为非活动状态时由netty调用,由于错误或正常连接关闭。

  • protected void messageReceived(ChannelHandlerContext ctx, String msg)

    当新消息到达时由netty调用,这里挑选队列的头部,然后我们调用setsuccess。

警告提醒

使用期货时,有一件事你需要注意,如果未来尚未完成,请不要从1个网络线程调用get(),不遵循这个简单规则会导致死锁或BlockingOperationException

您可以在netty项目中找到示例。 我们可以将结果保存到最后一个处理程序的自定义字段中。 在下面的代码中,我们想要的是handler.getFactorial()。

参考http://www.lookatsrc.com/source/io/netty/example/factorial/FactorialClient.java?a=io.netty:netty-all

FactorialClient.java

 public final class FactorialClient { static final boolean SSL = System.getProperty("ssl") != null; static final String HOST = System.getProperty("host", "127.0.0.1"); static final int PORT = Integer.parseInt(System.getProperty("port", "8322")); static final int COUNT = Integer.parseInt(System.getProperty("count", "1000")); public static void main(String[] args) throws Exception { // Configure SSL. final SslContext sslCtx; if (SSL) { sslCtx = SslContextBuilder.forClient() .trustManager(InsecureTrustManagerFactory.INSTANCE).build(); } else { sslCtx = null; } EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .handler(new FactorialClientInitializer(sslCtx)); // Make a new connection. ChannelFuture f = b.connect(HOST, PORT).sync(); // Get the handler instance to retrieve the answer. FactorialClientHandler handler = (FactorialClientHandler) f.channel().pipeline().last(); // Print out the answer. System.err.format("Factorial of %,d is: %,d", COUNT, handler.getFactorial()); } finally { group.shutdownGracefully(); } } } public class FactorialClientHandler extends SimpleChannelInboundHandler { private ChannelHandlerContext ctx; private int receivedMessages; private int next = 1; final BlockingQueue answer = new LinkedBlockingQueue(); public BigInteger getFactorial() { boolean interrupted = false; try { for (;;) { try { return answer.take(); } catch (InterruptedException ignore) { interrupted = true; } } } finally { if (interrupted) { Thread.currentThread().interrupt(); } } } @Override public void channelActive(ChannelHandlerContext ctx) { this.ctx = ctx; sendNumbers(); } @Override public void channelRead0(ChannelHandlerContext ctx, final BigInteger msg) { receivedMessages ++; if (receivedMessages == FactorialClient.COUNT) { // Offer the answer after closing the connection. ctx.channel().close().addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) { boolean offered = answer.offer(msg); assert offered; } }); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } private void sendNumbers() { // Do not send more than 4096 numbers. ChannelFuture future = null; for (int i = 0; i < 4096 && next <= FactorialClient.COUNT; i++) { future = ctx.write(Integer.valueOf(next)); next++; } if (next <= FactorialClient.COUNT) { assert future != null; future.addListener(numberSender); } ctx.flush(); } private final ChannelFutureListener numberSender = new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { sendNumbers(); } else { future.cause().printStackTrace(); future.channel().close(); } } }; } 

调用channel.writeAndFlush(msg); 已经返回一个ChannelFuture。 要处理此方法调用的结果,您可以像以下一样向未来添加一个侦听器:

 future.addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture future) { // Perform post-closure operation // ... } }); 

(这取自Netty文档,请参阅: Netty doc )