Netty Nio java中的通信

我想在Netty nio中创建一个具有两个客户端和一个服务器的通信系统。 更具体地说,首先,我希望当两个客户端与服务器连接以从服务器发送消息时,之后能够在两个客户端之间传递数据。 我正在使用此示例提供的代码 。 我在代码中的修改可以在这里找到: 链接

似乎serverHandler中的channelRead在第一个客户端被连接时工作,因此它总是返回1,但是当连接第二个客户端时不会更改为2.当两个客户端连接到服务器时,如何从服务器正确检查? 如何从客户端的主要function中动态读取此值? 那么这是让两个客户沟通的最佳方式?

EDIT1:显然似乎客户端服务正在运行并直接关闭,因此每次运行新的NettyClient时都会连接,但之后关闭连接。 所以计数器总是从0到1。 正如我在下面的评论中所建议的那样,我在同一个端口使用telnet测试它,然后计数器似乎正常增加,但NettyClient服务没有。

EDIT2:似乎我得到的问题来自future.addListener(ChannelFutureListener.CLOSE); 它位于ProcessingHandler class channelRead中。 当我评论它时,似乎代码工作。 但是,我不确定评论的后果是什么。 此外,我希望从我的客户端的主要function检查返回消息何时是特定的两个。 如何,我可以创建一个等待来自服务器的特定消息的方法,同时它阻止主要function。

  static EventLoopGroup workerGroup = new NioEventLoopGroup(); static Promise promise = workerGroup.next().newPromise(); public static void callClient() throws Exception { String host = "localhost"; int port = 8080; try { Bootstrap b = new Bootstrap(); b.group(workerGroup); b.channel(NioSocketChannel.class); b.option(ChannelOption.SO_KEEPALIVE, true); b.handler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new RequestDataEncoder(), new ResponseDataDecoder(), new ClientHandler(promise)); } }); ChannelFuture f = b.connect(host, port).sync(); } finally { //workerGroup.shutdownGracefully(); } } 

我希望在main函数内部调用方法并返回结果,当它为2时继续使用主要function。 但是,我无法在内部调用callClient,因为它将在同一个客户端运行多次。

  callBack(); while (true) { Object msg = promise.get(); System.out.println("Case1: the connected clients is not two"); int ret = Integer.parseInt(msg.toString()); if (ret == 2){ break; } } System.out.println("Case2: the connected clients is two"); // proceed with the main functionality 

如何更新第一个客户端的promise变量。 当我运行两个客户端时,对于第一个客户端,我总是收到消息:

案例1:连接的客户端不是两个

似乎承诺没有正常更新,而对于第二个客户我总是收到:

案例2:连接的客户端是两个

如果我的内存是正确的,ChannelHandlerContext是每个通道一个,并且它的管道中可以有多个ChannelHandler。 您的channels变量是处理程序类的实例变量。 然后为每个连接创建一个新的 ProcessingHandler实例。 因此,每次初始化时,每个将在channels变量中只有一个连接 – 它是为其创建的。

请参阅服务器代码(NettyServer.java)中initChannel函数中的new ProcessingHandler() )。

您可以将channels变量设置为静态,以便在ProcessingHandler实例之间共享它。 或者您可以在其他地方创建单个ProcessingHandler实例(例如,作为run()函数中的局部变量),然后将该实例传递给addLast调用而不是new ProcessingHandler()

为什么ChannelGroup频道的大小始终是一个。 即使我连接更多客户?

因为为每个新Channel (客户端)调用子ChannelInitializer 。 在那里,您正在创建ProcessingHandler新实例,因此每个通道都会看到自己的ChannelGroup实例。

解决方案1 ​​ – 通道属性

使用属性并将其与Channel关联。

在某处创建属性(让我们说在Constants类中):

 public static final AttributeKey CH_GRP_ATTR = AttributeKey.valueOf(SomeClass.class.getName()); 

现在,创建将由ProcessingHandler的所有实例使用的ChannelGroup:

 final ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); 

在NettyServer中更新您的子ChannelInitializer

 @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( new RequestDecoder(), new ResponseDataEncoder(), new ProcessingHandler()); ch.attr(Constants.CH_GRP_ATTR).set(channels); } 

现在您可以在处理程序中访问ChannelGroup的实例,如下所示:

 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { final ChannelGroup channels = ctx.channel().attr(Constants.CH_GRP_ATTR).get(); channels.add(ctx.channel()); 

这将起作用,因为每次新客户端连接时,ChannelInitializer都将被调用,同时引用ChannelGroup

解决方案2 – 静态字段

如果将ChannelGroup声明为static,则所有类实例都将看到相同的ChannelGroup实例:

 private static final ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); 

解决方案3 – 传播共享实例

将参数引入ProcessingHandler构造函数:

 private final ChannelGroup channels; public ProcessingHandler(ChannelGroup chg) { this.channels = chg; } 

现在,在NettyServer类中创建ChannelGroup的实例并将其传播到ProcessingHandler构造函数:

 final ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( new RequestDecoder(), new ResponseDataEncoder(), new ProcessingHandler(channels)); // <- here } 

就个人而言,我会选择第一个解决方案,因为

  • 它明确地将ChannelGroup与Channel上下文相关联
  • 您可以在其他处理程序中访问相同的ChannelGroup
  • 您可以拥有多个服务器实例(在同一JVM中的不同端口上运行)