使用Netty.io将文件从服务器发送到客户端

我正在尝试从服务器向客户端发送文件,即从服务器请求的客户端。 客户端在FileRequestProtocol中对文件进行FileRequestProtocol ,将其发送到服务器,服务器将文件大小添加到文件FileRequestProtocol并将其返回给客户端。

客户端向其管道添加具有正确文件大小的新FileChunkReqWriteHandler

服务器使用上下文和所需文件创建一个新的ChunkedFileServerHandler并尝试发送它,但FileChunkReqWriteHandler永远不会从通道读取字节。

我在这做错了什么?

日志

 INFO ProtocolHeadHandler:48 - Client send ProtocolHead [version=1, jobType=FILEREQUEST] INFO ProtocolHeadServerHandler:36 - Server receive ProtocolHead [version=1, jobType=FILEREQUEST] INFO ProtocolHeadHandler:57 - Client ProtocolHead equals, Send Protocol FileRequestProtocol [filePath=test.jpg, fileSize=0] INFO FileRequestServerHandler:42 - Server new FileRequest FileRequestProtocol [filePath=test.jpg, fileSize=0] INFO FileRequestHandler:41 - Client receives FileRequestProtocol [filePath=test.jpg, fileSize=174878] INFO ChunkedFileServerHandler:39 - New ChunkedFileServerHandler INFO FileChunkReqWriteHandler:20 - New ChunkedFile Handler FileRequestProtocol [filePath=test.jpg, fileSize=174878] 

客户

FileRequestHandler.java

 public class FileRequestHandler extends SimpleChannelInboundHandler { private Logger logger = Logger.getLogger(this.getClass()); public FileRequestHandler() { } @Override public void channelRead0(ChannelHandlerContext ctx, FileRequestProtocol msg) { logger.info("Client receives " + msg); ReferenceCountUtil.release(msg); ctx.channel().pipeline().addLast(new FileChunkReqWriteHandler(msg)); } @Override public void channelReadComplete(ChannelHandlerContext ctx) { logger.info("Client read complete"); ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } } 

FileChunkReqWriteHandler.java

 public class FileChunkReqWriteHandler extends SimpleChannelInboundHandler { FileRequestProtocol fileRequestProtocol; private Logger logger = Logger.getLogger(this.getClass()); public FileChunkReqWriteHandler(FileRequestProtocol msg) { this.fileRequestProtocol = msg; logger.info("New ChunkedFile Handler " + msg); } @Override public void channelActive(ChannelHandlerContext ctx) { logger.info("in channel active method"); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); if (ctx.channel().isActive()) { ctx.writeAndFlush("ERR: " + cause.getClass().getSimpleName() + ": " + cause.getMessage() + '\n').addListener(ChannelFutureListener.CLOSE); } } @Override protected void channelRead0(ChannelHandlerContext ctx, ChunkedFile msg) throws Exception { logger.info("in channelRead0"); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { logger.info("channelRead"); ByteBuf buf = (ByteBuf) msg; byte[] bytes = new byte[buf.readableBytes()]; buf.readBytes(bytes); if(buf.readableBytes() >= this.fileRequestProtocol.getFileSize()) { logger.info("received all data"); } } } 

服务器

FileRequestServerHandler.java

 public class FileRequestServerHandler extends SimpleChannelInboundHandler { private File f; private Logger logger = Logger.getLogger(this.getClass()); @Override public void channelRead0(ChannelHandlerContext ctx, FileRequestProtocol fileRequest) { logger.info("Server new FileRequest " + fileRequest); f = new File(fileRequest.getFilePath()); fileRequest.setFileSize(f.length()); ctx.writeAndFlush(fileRequest); new ChunkedFileServerHandler(ctx,f); } @Override public void channelReadComplete(ChannelHandlerContext ctx) { logger.info("Server read complete"); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } } 

ChunkedFileServerHandler.java

 public class ChunkedFileServerHandler extends ChunkedWriteHandler { private Logger logger = Logger.getLogger(this.getClass()); private File file; public ChunkedFileServerHandler(ChannelHandlerContext ctx, File file) { this.file = file; logger.info("New ChunkedFileServerHandler"); ChunkedFile chunkedFile; try { chunkedFile = new ChunkedFile(this.file); ctx.writeAndFlush(chunkedFile); ctx.close(); } catch (IOException e) { e.printStackTrace(); } } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); logger.info("FILE WRITE GETS ACTIVE"); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } } 

更新

 public class ServerInitializer extends ChannelInitializer { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast("encoder", new ObjectEncoder()); p.addLast("decoder", new ObjectDecoder(ClassResolvers.cacheDisabled(null))); p.addLast("protocolhead", new ProtocolHeadServerHandler()); p.addLast("filerequestserverhandler", new FileRequestServerHandler()); p.addLast("chunkedfileserver", new ChunkedFileServerHandler()); } } 

服务器启动

 public void startUp() { bossGroup = new NioEventLoopGroup(1); workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ServerInitializer()); // Bind and start to accept incoming connections. b.bind(this.port).sync().channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } 

我可以看到两部分:

1)您不应该在自己的处理程序中创建新的处理程序,而是直接创建ChunkedFile并编写它:

 public class FileRequestServerHandler extends SimpleChannelInboundHandler { private File f; private Logger logger = Logger.getLogger(this.getClass()); @Override public void channelRead0(ChannelHandlerContext ctx, FileRequestProtocol fileRequest) { logger.info("Server new FileRequest " + fileRequest); f = new File(fileRequest.getFilePath()); fileRequest.setFileSize(f.length()); ctx.writeAndFlush(fileRequest); // directly make your chunkedFile there instead of creating a sub handler chunkedFile = new ChunkedFile(this.file); ctx.writeAndFlush(chunkedFile);// need a specific handler // Don't create such an handler: new ChunkedFileServerHandler(ctx,f); } 

2)由于你使用ChunkedInput(这里是ChunkedFile)编写,你必须在你的管道中有一个ChunkedWriteHandler在你的处理程序之前,所以你的初始化程序看起来像:

 public class ServerInitializer extends ChannelInitializer { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast("encoder", new ObjectEncoder()); p.addLast("decoder", new ObjectDecoder(ClassResolvers.cacheDisabled(null))); p.addLast("chunkedWriteHandler", new ChunkedWriteHandler());// added p.addLast("protocolhead", new ProtocolHeadServerHandler()); p.addLast("filerequestserverhandler", new FileRequestServerHandler()); // removed: p.addLast("chunkedfileserver", new ChunkedFileServerHandler()); } } 

可以更改ChunkedWriteHandler的位置,但始终在您编写ChunkedFile自己的处理程序之前。

3)最后的注意事项:看看并注意你的编码器/解码器(ObjectEncoder / ObjectDecoder),因为我不能100%确定他们是否可以与文件一起编写/读取ByteBuf。 它可能工作,或不…