在Netty连接关闭后重新连接的最佳方法是什么

简单场景:

  1. 较低级别的A类,它扩展了SimpleChannelUpstreamHandler。 这个类是发送消息并收到响应的主力。
  2. 顶级B类,系统的其他部分可以使用它来发送和接收消息(可以模拟同步和异步)。 此类创建ClientBootstrap,设置管道工厂,调用bootstrap.connect()并最终获得A类的句柄/引用,用于发送和接收消息。 就像是:

    ChannelFuture future = bootstrap.connect(); Channel channel = future.awaitUninterruptibly().getChannel(); 

    处理程序= channel.getPipeline()。get(A.class);

我知道在A类中,我可以覆盖public void channelClosed(ChannelHandlerContext ctx,ChannelStateEvent e); 这样当远程服务器关闭时,我会收到通知。

由于在关闭通道之后,B类中的原始类A引用(上面的处理程序)不再有效,因此我需要用新引用替换它。

理想情况下,我希望类A具有在上面覆盖的channelClosed方法中通知类B的机制,因此可以在类B中再次调用bootstrap.connect。一种方法是在类A中引用引用类B的引用为此,我需要将B类引用传递给PipelineFactory,然后让PipelineFactory将B的引用传递给A.

任何其他更简单的方法来实现同样的事情?

谢谢,

Channel.closeFuture()返回一个ChannelFuture ,它将在通道关闭时通知您。 您可以在B中为将来添加ChannelFutureListener ,以便您可以在那里进行另一次连接尝试。

您可能希望重复此操作,直到最后连接尝试成功为止:

 private void doConnect() { Bootstrap b = ...; b.connect().addListener((ChannelFuture f) -> { if (!f.isSuccess()) { long nextRetryDelay = nextRetryDelay(...); f.channel().eventLoop().schedule(nextRetryDelay, ..., () -> { doConnect(); }); // or you can give up at some point by just doing nothing. } }); } 

经过长时间的尝试,我手动实现了这个function。 线程是一个不同的野兽。 无论如何,这就是我如何应对这一挑战。

这是客户:

 import java.util.logging.Logger; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; public class NettyClient implements Runnable { Logger logger = Logger.getLogger(NettyClient.class.getName()); boolean done = false; public void run() { String host = "10.99.1.249"; int port = 7973; EventLoopGroup workerGroup = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(workerGroup); b.channel(NioSocketChannel.class); b.option(ChannelOption.SO_KEEPALIVE, true); b.handler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new StringEncoder(), new StringDecoder(), new DelimiterBasedFrameDecoder(8192, false, Unpooled.wrappedBuffer("".getBytes())), new ClientHandler(getClient())); } }); ChannelFuture f = b.connect(host, port).sync(); Channel ch = f.channel(); ChannelFuture f5 = b.bind(ch.localAddress()); while(!done) { Thread.sleep(5000); } System.out.println("Done."); } catch (InterruptedException e) { logger.info(e.getMessage());; } finally { workerGroup.shutdownGracefully(); } } public boolean isDone() { return done; } public void setDone(boolean done) { this.done = done; } private NettyClient getClient() { return this; } } 

这是相应的ClientHandler,它可以完成重新连接的实际工作:

 import java.net.SocketAddress; import java.util.logging.Logger; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; public class ClientHandler extends ChannelInboundHandlerAdapter { Logger logger = Logger.getLogger(ClientHandler.class.getName()); private NettyClient client; public ClientHandler(NettyClient client) { super(); this.client = client; } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { Channel ch = ctx.channel(); ChannelFuture f2 = ch.writeAndFlush(""); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { // invoked when there is a problem connecting to remote server; needs to be enhanced to properly handle thread shutdown client.setDone(true); SocketAddress sa = ctx.channel().remoteAddress(); while(true) { try { NettyClient nc = new NettyClient(); nc.run(); break; } catch(Exception ex) { logger.info("Reconnecting..."); } Thread.sleep(5000); } } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // incoming notifications are handled here System.out.println((String)msg); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } } 

实际的重新连接在channelInactive()方法中处理。