Netty IllegalReferenceCountException

这是我在这里的回答。 虽然我的业务逻辑没有问题,但事实certificate我没有使用Netty ByteBuf 。 一旦我更新了我的测试代码以使用ByteBuf ,我遇到了一个无限循环的IllegalReferenceCountException 。 我承认自己是Netty的新手,但这并不能certificate回到手动资源分配和发布的时代。 创建GC是为了避免这种混乱。 迪斯科,有人吗? 那么Bell Bottoms怎么样?

 public class StringDecoder extends AbstractDecoder { private static final IntPredicate NEWLINE_DELIMITER = b -> b == '\n' || b == '\r'; @Override public Flux decode(Publisher publisher, ResolvableType elementType, MimeType mimeType, Map hints) { return Flux.from(publisher) .scan(Tuples.<Flux, Optional>of(Flux.empty(), Optional.empty()), (acc, buffer) -> { List results = new ArrayList(); int startIdx = 0, endIdx = 0, limit = buffer.readableByteCount(); Optional incomplete = acc.getT2(); while (startIdx  b.write(slice)) .orElse(buffer.factory().allocateBuffer(length).write(slice)); tmp = DataBufferUtils.retain(tmp); if (endIdx != -1) { startIdx = endIdx + 1; results.add(tmp); incomplete = Optional.empty(); } else { incomplete = Optional.of(tmp); } } releaseBuffer(buffer); return Tuples.of(Flux.fromIterable(results), incomplete); }) .flatMap(t -> { t.getT2().ifPresent(this::releaseBuffer); return t.getT1(); }) .map(buffer -> { // charset resolution should in general use supplied mimeType String s = UTF_8.decode(buffer.asByteBuffer()).toString(); releaseBuffer(buffer); return s; }) .log(); } private void releaseBuffer(DataBuffer buffer) { boolean release = DataBufferUtils.release(buffer); if (release) { System.out.println("Buffer was released."); } } } public class StringDecoderTest { private StringDecoder stringDecoder = new StringDecoder(); DataBufferFactory dataBufferFactory = new NettyDataBufferFactory(UnpooledByteBufAllocator.DEFAULT); @Test public void testDecode() { Flux pub = Flux.just("abc\n", "abc", "def\n", "abc", "def\nxyz\n", "abc", "def", "xyz\n") .map(s -> dataBufferFactory.wrap(s.getBytes(UTF_8))); StepVerifier.create(stringDecoder.decode(pub, null, null, null)) .expectNext("abc", "abcdef", "abcdef", "xyz", "abcdefxyz") .verifyComplete(); } } 

我一直在:

 [ERROR] (main) onError(io.netty.util.IllegalReferenceCountException: refCnt: 0) [ERROR] (main) - io.netty.util.IllegalReferenceCountException: refCnt: 0 io.netty.util.IllegalReferenceCountException: refCnt: 0 at io.netty.buffer.AbstractByteBuf.ensureAccessible(AbstractByteBuf.java:1415) at io.netty.buffer.UnpooledHeapByteBuf.nioBuffer(UnpooledHeapByteBuf.java:314) at io.netty.buffer.AbstractUnpooledSlicedByteBuf.nioBuffer(AbstractUnpooledSlicedByteBuf.java:434) at io.netty.buffer.CompositeByteBuf.nioBuffers(CompositeByteBuf.java:1496) at io.netty.buffer.CompositeByteBuf.nioBuffer(CompositeByteBuf.java:1468) at io.netty.buffer.AbstractByteBuf.nioBuffer(AbstractByteBuf.java:1205) at org.springframework.core.io.buffer.NettyDataBuffer.asByteBuffer(NettyDataBuffer.java:234) at org.abhijitsarkar.java.StringDecoder.lambda$decode$4(StringDecoder.java:61) 

工作代码:

 public class StringDecoder extends AbstractDecoder { private static final IntPredicate NEWLINE_DELIMITER = b -> b == '\n' || b == '\r'; @Override public Flux decode(Publisher publisher, ResolvableType elementType, MimeType mimeType, Map hints) { DataBuffer incomplete = new NettyDataBufferFactory(UnpooledByteBufAllocator.DEFAULT).allocateBuffer(0); return Flux.from(publisher) .scan(Tuples., DataBuffer>of(Flux.empty(), retain(incomplete)), (acc, buffer) -> { List results = new ArrayList<>(); int startIdx = 0, endIdx = 0, limit = buffer.readableByteCount(); while (startIdx < limit && endIdx != -1) { endIdx = buffer.indexOf(NEWLINE_DELIMITER, startIdx); int length = (endIdx == -1 ? limit : endIdx) - startIdx; DataBuffer slice = buffer.slice(startIdx, length); byte[] slice1 = new byte[length]; slice.read(slice1, 0, slice1.length); if (endIdx != -1) { byte[] slice2 = new byte[incomplete.readableByteCount()]; incomplete.read(slice2, 0, slice2.length); // call retain to match release during decoding to string later results.add(retain( incomplete.factory().allocateBuffer() .write(slice2) .write(slice1) )); startIdx = endIdx + 1; } else { incomplete.write(slice1); } } return Tuples.of(Flux.fromIterable(results), incomplete); }) .flatMap(Tuple2::getT1) .map(buffer -> { // charset resolution should in general use supplied mimeType String s = UTF_8.decode(buffer.asByteBuffer()).toString(); return s; }) .doOnTerminate(() -> release(incomplete)) .log(); } } 

代码可能会更清洁,但对于Spring bug SPR-16351 。