netty ChannelInboundHandlerAdapter在~1500字节处削减帧

我已经实现了一个服务器应用程序,它使用netty框架使用ChannelInblundHandlerAdapter读取传入的字节。

我在标题中显示的问题是,我不定期地从客户端获取内容,我认为这样的内容在~1.500字节后被删除。 例如:在这种情况下,我应该收到一个大的JSON数组。 因为它被剪切了我无法解析它。

在使用之前,我尝试使用管道中的附加ByteToMessageDecoder通道对消息进行解码。 但这并不能解决问题。 我在JSON中没有分隔符,我可以检查并再次将两个(或更多)部分粘在一起。

这是我的管道配置:

ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) { ch.pipeline().addLast(new IdleStateHandler(45,0,0)); ch.pipeline().addLast(new MyByteToMessageDecoder()); ch.pipeline().addLast(new GatewayCommunicationHandler()); } }) .option(ChannelOption.SO_BACKLOG, 128) .option(ChannelOption.SO_RCVBUF, 8192) .childOption(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(8192)) .childOption(ChannelOption.SO_KEEPALIVE, true); initRestServer(); // Bind and start to accept incoming connections. ChannelFuture f = b.bind(Config.gatewayPort).sync(); f.channel().closeFuture().sync(); 

那就是我的ByteToMessageDecoder :(我知道它很乱,但我不知道如何在我的情况下处理它)

 public class MyByteToMessageDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) { byte[] receivedBytes = new byte[in.readableBytes()]; in.getBytes(in.readerIndex(), receivedBytes); if (receivedBytes[in.readableBytes()-1] != (byte) 0) { out.add(receivedBytes); return; } int lenForOutBytes = 0; for (Object o : out) { byte[] bytes = (byte[]) o; lenForOutBytes += bytes.length; } byte[] outBytes = new byte[lenForOutBytes]; for (Object o : out) { byte[] bytes = (byte[]) o; if (out.size() == 1) { outBytes = (byte[]) out.get(0); } else { int i = 0; for (int j = 0; j < bytes.length; j++) { outBytes[i + j] = bytes[j]; } i += bytes.length; } } ctx.fireChannelRead(outBytes); in.resetReaderIndex(); } ... 

有没有其他人有这样的问题。

谢谢你的回复

Br Joe

我已经看到这个问题经常发生,所以我故意比平时更广泛

出现此问题的原因是TCP是基于流的,而不是基于数据包的。

这基本上发生了:

  1. [client]想要发送10k字节的数据
  2. [client]将数据发送到TCP层
  3. [客户端] TCP层拆分数据包,它知道最大数据包大小为1500(这几乎是所有网络使用的默认MTU)
  4. [client]客户端向服务器发送包含40个字节作为头的数据包,以及1460个字节作为数据
  5. [服务器] Netty收到第一个数据包,并直接调用你的函数,第一个数据包包含1460个字节的数据
  6. [服务器]在您的function需要进行剩余数据时(初始数据 – 1260)

所以解决这个问题,有多种方法

预先添加长度为的消息:

虽然这通常是解决数据包的最简单方法,但它在同时处理小型和大型消息时效率最低。 这也需要更改协议。

基本思想是在发送数据包之前预先设置长度,这样就可以正确地分割消息

好处

  • 无需循环数据以过滤掉字符或阻止禁止字符
  • 如果您的网络中有中继系统,则不必对消息边界进行任何硬解析

缺点

  • 消息的长度必须是有利的,在长消息中,这是内存扩展

怎么样?

如果你使用一个标准的整数字段,这很简单,因为Netty为此构建了类:

  • LengthFieldBasedFrameDecoder
  • LengthFieldPrepender

这在管道中以下列方式使用

 // int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip pipeline.addLast(new LengthFieldBasedFrameDecoder(1024 * 4, 0, 2, 0, 2)); // int lengthFieldLength, int lengthAdjustment pipeline.addLast(new LengthFieldPrepender(2, 0)); 

这基本上构成了如下所示的数据包:

你发送:

 DATA: 12B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 abcdef | +--------+-------------------------------------------------+----------------+ |00000000| 48 65 6c 6c 6f 20 57 6f 72 6c 64 21 |Hello World! | +--------+-------------------------------------------------+----------------+ 

LengthFieldPrepender将其转换为:

 DATA: 14B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 abcdef | +--------+-------------------------------------------------+----------------+ |00000000| 00 0c 48 65 6c 6c 6f 20 57 6f 72 6c 64 21 |..Hello World! | +--------+-------------------------------------------------+----------------+ 

然后,当您收到消息时, LengthFieldBasedFrameDecoder解码为:

 DATA: 12B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 abcdef | +--------+-------------------------------------------------+----------------+ |00000000| 48 65 6c 6c 6f 20 57 6f 72 6c 64 21 |Hello World! | +--------+-------------------------------------------------+----------------+ 

在简单分隔符上拆分消息

有些协议采用不同的方法,而不是在修复长度上拆分,它们在分隔符上拆分。 一个快速的方法是Java中的字符串以"" ,文本文件中的行结束于换行符,自然文本中的段落以双换行结束。

好处

  • 如果您知道某些数据不包含字符,则相对容易生成,例如JSON通常不包含空格,因此通过空格分隔消息很容易。
  • 易于使用脚本语言实现,因为不需要任何状态

缺点

  • 与框架角色的冲突可能会使邮件大小膨胀
  • 长度未提前知道,因此要么在代码中设置硬编码限制,要么继续读取直到内存不足或数据结束
  • 即使您对数据包不感兴趣,也需要读取每个字符

怎么样?

从Netty发送消息时,您需要手动将分隔符添加到消息本身,接收时可以使用DelimiterBasedFrameDecoder将传入的流解码为消息。

示例管道:

这在管道中以下列方式使用

 // int maxFrameLength, ByteBuf... delimiters pipeline.addLast(1024 * 4, DelimiterBasedFrameDecoder(Delimiters.lineDelimiter())); 

发送消息时,您需要手动添加分隔符:

 DATA: 14B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 abcdef | +--------+-------------------------------------------------+----------------+ |00000000| 48 65 6c 6c 6f 20 57 6f 72 6c 64 21 0d 0a |Hello World!.. | +--------+-------------------------------------------------+----------------+ 

接收消息时, DelimiterBasedFrameDecoder会将消息转换为帧:

 DATA: 12B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 abcdef | +--------+-------------------------------------------------+----------------+ |00000000| 48 65 6c 6c 6f 20 57 6f 72 6c 64 21 |Hello World! | +--------+-------------------------------------------------+----------------+ 

拆分复杂的业务分隔符

并非所有框架都很简单,如果避免使用某些解决方案实际上是最好的,但有时候,你真的需要做一些肮脏的工作。

好处

  • 可以虚拟处理所有现有的数据结构
  • 无需修改协议

缺点

  • 通常你必须检查每个字节
  • 代码很难遵循
  • 快速解决方案可以提供它认为格式不正确的输入的奇怪错误

这属于两类:

  • 现有的解码器
  • 模式检测

现有的解码器

使用这些解决方案,您基本上可以使用来自其他框架的现有解码器来解析数据包,并检测其处理中的故障。

使用GSON和ReplayingDecoder的示例:

 public class GSONDecoder extends ReplayingDecoder { Gson gson = new GsonBuilder().create(); protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List out) throws Exception { out.add(gson.fromJson(new ByteBufInputStream(buf, false), Object.class)); } } 

模式检测

如果要使用模式检测方法,则需要了解协议。 让我们为JSON制作一个模式检测解码器。

基于JSON的结构,让我们做出以下假设:

  1. JSON基于匹配{} ,和[]
  2. 匹配{}之间的对应应该被忽略
  3. "当一个\前面应该被忽略
  4. 如果从左到右进行解析,则应该在\前面加一个\来忽略

基于这些属性,让我们根据这些假设制作一个ByteToMessageDecoder

 public static class JSONDecoder extends ByteToMessageDecoder { // Notice, this class is designed for JSON without a charset definition at the start, adding this is hard as we basicly have to call differend @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { in.markReaderIndex(); int fromIndex = in.readerIndex(); int unclosedCurlyBracketsSeen = 0; boolean inQuotedSection = false; boolean nonWhitespaceSeen = false; boolean slashSeen = false; while (in.isReadable()) { boolean newSlashSeenState = false; byte character = in.readByte(); if (character == '{' && !inQuotedSection) { unclosedCurlyBracketsSeen++; } if (character == '}' && !inQuotedSection) { unclosedCurlyBracketsSeen--; } if (character == '[' && !inQuotedSection) { unclosedCurlyBracketsSeen++; } if (character == ']' && !inQuotedSection) { unclosedCurlyBracketsSeen--; } if (character == '"' && !slashSeen) { inQuotedSection = !inQuotedSection; } if (character == '\\' && !slashSeen) { newSlashSeenState = true; } if (!Character.isWhitespace(character)) { nonWhitespaceSeen = true; } slashSeen = newSlashSeenState; if(unclosedCurlyBracketsSeen == 0 && nonWhitespaceSeen) { int targetIndex = in.readerIndex(); out.add(in.slice(fromIndex, targetIndex - fromIndex).retain()); return; } } // End of stream reached, but our JSON is not complete, reset our progress! in.resetReaderIndex(); } } 

接收消息时,这是它的工作原理:

 DATA: 35B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 abcdef | +--------+-------------------------------------------------+----------------+ |00000000| 7b 22 68 69 21 22 2c 22 53 74 72 69 6e 67 3a 20 |{"hi!","String: | |00000010| 5c 22 48 69 5c 22 22 7d 20 20 7b 22 73 6c 61 73 |\"Hi\""} {"slas| |00000020| 68 22 3a |h": | +--------+-------------------------------------------------+----------------+ DATA: 34B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 abcdef | +--------+-------------------------------------------------+----------------+ |00000000| 22 5c 5c 22 7d 7b 22 4e 65 73 74 65 64 3a 22 3a |"\\"}{"Nested:":| |00000010| 7b 22 64 65 65 70 65 72 22 3a 7b 22 6f 6b 22 7d |{"deeper":{"ok"}| |00000020| 7d 7d |}} | +--------+-------------------------------------------------+----------------+ 

如您所见,我们收到了2条消息,其中1条甚至在2个“虚拟TCP”数据包之间分段,这由我们的“JSON解码器”转换为以下ByteBuf数据包:

 DATA: 24B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 abcdef | +--------+-------------------------------------------------+----------------+ |00000000| 7b 22 68 69 21 22 2c 22 53 74 72 69 6e 67 3a 20 |{"hi!","String: | |00000010| 5c 22 48 69 5c 22 22 7d |\"Hi\""} | +--------+-------------------------------------------------+----------------+ DATA: 16B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 abcdef | +--------+-------------------------------------------------+----------------+ |00000000| 20 20 7b 22 73 6c 61 73 68 22 3a 22 5c 5c 22 7d | {"slash":"\\"}| +--------+-------------------------------------------------+----------------+ DATA: 29B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 abcdef | +--------+-------------------------------------------------+----------------+ |00000000| 7b 22 4e 65 73 74 65 64 3a 22 3a 7b 22 64 65 65 |{"Nested:":{"dee| |00000010| 70 65 72 22 3a 7b 22 6f 6b 22 7d 7d 7d |per":{"ok"}}} | +--------+-------------------------------------------------+----------------+