每个UDP数据报的Netty不同管道

我们已经有一个已经在TCP / IP中实现的服务器,但我们现在要求协议也支持UDP。

发送的每个UDP数据报都包含我需要解码的所有内容,因此它是一个非常简单的回复和响应系统,数据报中的数据由换行符分隔。

启动服务器时引导程序的代码如下所示:

//SETUP UDP SERVER DatagramChannelFactory udpFactory = new NioDatagramChannelFactory(Executors.newCachedThreadPool()); ConnectionlessBootstrap udpBootstrap = new ConnectionlessBootstrap(udpFactory); udpBootstrap.setOption("sendBufferSize", 65536); udpBootstrap.setOption("receiveBufferSize", 65536); udpBootstrap.setOption("receiveBufferSizePredictorFactory", new AdaptiveReceiveBufferSizePredictorFactory()); udpBootstrap.setOption("broadcast", "true"); udpBootstrap.setPipelineFactory(new ServerPipeLineFactoryUDP()); udpBootstrap.bind(new InetSocketAddress(hostIp, 4000)); 

管道代码是:

 class ServerPipeLineFactoryUDP implements ChannelPipelineFactory { private final static ExecutionHandler EXECUTION_HANDLER = new ExecutionHandler(new OrderedMemoryAwareThreadPoolExecutor(ScorpionFMS.THREAD_POOL_COUNT, 0, 0)); public ServerPipeLineFactoryUDP() { } @Override public ChannelPipeline getPipeline() throws Exception { ChannelPipeline pipeline = pipeline(); pipeline.addLast("debugup", new DebugUpstreamHandler("UDP")); pipeline.addLast("debugdown", new DebugDownstreamHandler("UDP")); pipeline.addLast("framer", new DelimiterBasedFrameDecoder(256, Delimiters.lineDelimiter())); pipeline.addLast("decoder", new UDPRequestDecoder(true)); pipeline.addLast("encoder", new StringEncoder()); pipeline.addLast("executor", EXECUTION_HANDLER); pipeline.addLast("handler", new UDPRequestHandler( return pipeline; } } 

我遇到的问题是每个数据报都在使用该管道的相同实例(我希望每个数据报都使用管道的新实例),因此在处理数据报内容时存储的所有状态都被保存,下一个数据报使用它也是,(对于TCP,每个连接都有自己的通道,因此它自己的管道实例和它自己的状态)

我知道这是阅读文档时的预期行为,但无论如何都要强制netty为每个数据报重新创建管道? 或者我是以完全错误的方式解决这个问题的?

简而言之,我希望每个数据报都有一个新的管道实例(与tcp相同)

就像我在IRC中所说的那样,我认为可以做你想做的事情或至少给你一些想法。

 public class Example { public static void main(String[] args) { final ChannelPipelineHandlerImpl perDatagramFactory = new ChannelPipelineHandlerImpl(); DatagramChannelFactory udpFactory = new NioDatagramChannelFactory(Executors.newCachedThreadPool()); ConnectionlessBootstrap udpBootstrap = new ConnectionlessBootstrap(udpFactory); udpBootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() throws Exception { return Channels.pipeline(new DistinctChannelPipelineHandler(perDatagramFactory)); } }); } private static final class DistinctChannelPipelineHandler implements ChannelDownstreamHandler, ChannelUpstreamHandler { private ChannelPipelineFactory factory; public DistinctChannelPipelineHandler(ChannelPipelineFactory factory) { this.factory = factory; } public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception { ChannelPipeline pipeline = factory.getPipeline(); pipeline.attach(ctx.getChannel(), ctx.getPipeline().getSink()); pipeline.sendUpstream(e); ctx.sendUpstream(e); } public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception { ChannelPipeline pipeline = factory.getPipeline(); pipeline.attach(ctx.getChannel(), ctx.getPipeline().getSink()); pipeline.sendDownstream(e); ctx.sendDownstream(e); } } private static final class ChannelPipelineHandlerImpl implements ChannelPipelineFactory { public ChannelPipeline getPipeline() throws Exception { // Add your handlers here return Channels.pipeline(); } } } 

我不确定如何处理UDP通道,但如果每个数据报的通道不同,您可以将状态存储在ChannelLocal中 。