如何知道Netty ByteBuf中是否没有可读取的数据?

我是Netty的新手。 文件传输存在一个问题让我困惑了好几天。 我想将图像文件从客户端发送到服务器。

以下代码是可执行的。 但只有我强行关闭服务器才能正常打开收到的图像文件。 否则,它显示“ 看起来您没有权限查看此文件。请检查权限并重试 ”。 因此,当ByteBuf中没有数据使用ByteBuf.isReadable()时 ,我想关闭fileoutputstream,但ServerHandler中方法channelRead中的else块永远不会到达。 这毫无用处。

此外,如果发送文本文件,它可以在服务器处于活动状态时正常打开。 我不希望每次转移后都关闭服务器 。 请给我一些解决方案的建议。

这是FileClientHandler

public class FileClientHandler extends ChannelInboundHandlerAdapter { private int readLength = 8; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { sendFile(ctx.channel()); } private void sendFile(Channel channel) throws IOException { File file = new File("C:\\Users\\xxx\\Desktop\\1.png"); FileInputStream fis = new FileInputStream(file); BufferedInputStream bis = new BufferedInputStream(fis); for (;;) { byte[] bytes = new byte[readLength]; int readNum = bis.read(bytes, 0, readLength); // System.out.println(readNum); if (readNum == -1) { bis.close(); fis.close(); return; } sendToServer(bytes, channel, readNum); } } private void sendToServer(byte[] bytes, Channel channel, int length) throws IOException { channel.writeAndFlush(Unpooled.copiedBuffer(bytes, 0, length)); } } 

这是FileServerHandler

 public class FileServerHandler extends ChannelInboundHandlerAdapter { private File file = new File("C:\\Users\\xxx\\Desktop\\2.png"); private FileOutputStream fos; public FileServerHandler() { try { if (!file.exists()) { file.createNewFile(); } else { file.delete(); file.createNewFile(); } fos = new FileOutputStream(file); } catch (IOException e) { e.printStackTrace(); } } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { ByteBuf buf = (ByteBuf) msg; if (buf.isReadable()) { buf.readBytes(fos, buf.readableBytes()); fos.flush(); } else { System.out.println("I want to close fileoutputstream!"); buf.release(); fos.flush(); fos.close(); } } catch (Exception e) { e.printStackTrace(); } } } 

修复服务器端

在Netty世界中,有多个“事件”:

  • channelActive
  • channelRead
  • channelReadComplete
  • channelInactive
  • exceptionCaught
  • 更多…

在这些“事件”中,您可能已经知道channelRead做了什么(因为您使用它),但您似乎需要的另一个是channelInactive 。 当另一个端点关闭连接时调用此端口,您可以像这样使用它:

 @Override public void channelInactive(ctx) { System.out.println("I want to close fileoutputstream!"); fos.close(); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { ByteBuf buf = (ByteBuf) msg; // if (buf.isReadable()) { // a buf should always be readable here buf.readBytes(fos, buf.readableBytes()); // fos.flush(); // flushing is always done when closing //} else { // System.out.println("I want to close fileoutputstream!"); // buf.release(); // Should be placed in the finally block // fos.flush(); // fos.close(); //} } catch (Exception e) { e.printStackTrace(); } finally { buf.release(); // Should always be done, even if writing to the file fails } } 

但是,服务器如何知道连接已关闭? 目前客户端没有关闭服务器,而是继续在后台运行,永远保持连接活动。

修复客户端

要正确关闭来自客户端的连接,我们需要调用channel.close() ,但是,我们不能在返回行之前直接插入它,因为这会导致发送数据和关闭网络层中的连接之间的竞争条件,可能会丢失数据。

为了正确处理这些条件,Netty使用Future系统,该系统允许代码在异步操作发生后处理事件。

幸运的是,Netty已经为此提供了解决方案 ,我们只需将其连接起来。 要将此解决方案连接到我们的代码,我们必须跟踪Netty写入方法发出的最新ChannelFuture

要正确实现此解决方案,我们更改sendToServer以返回write方法的结果:

 private ChannelFuture sendToServer(byte[] bytes, Channel channel, int length) throws IOException { return channel.writeAndFlush(Unpooled.copiedBuffer(bytes, 0, length)); } 

然后我们保持跟踪这个返回值,并在我们想要关闭连接时添加一个包含Netty构建的侦听器:

 ChannelFuture lastFuture = null; for (;;) { byte[] bytes = new byte[readLength]; int readNum = bis.read(bytes, 0, readLength); // System.out.println(readNum); if (readNum == -1) { bis.close(); fis.close(); if(lastFuture == null) { // When our file is 0 bytes long, this is true channel.close(); } else { lastFuture.addListener(ChannelFutureListener.CLOSE); } return; } lastFuture = sendToServer(bytes, channel, readNum); }