具有Netty的multithreadingUDP服务器

我正在尝试使用Netty实现UDP服务器。 想法是只绑定一次(因此只创建一个Channel )。 此Channel仅使用一个处理程序进行初始化,该处理程序通过ExecutorService在多个线程之间调度传入数据报的处理。

 @Configuration public class SpringConfig { @Autowired private Dispatcher dispatcher; private String host; private int port; @Bean public Bootstrap bootstrap() throws Exception { Bootstrap bootstrap = new Bootstrap() .group(new NioEventLoopGroup(1)) .channel(NioDatagramChannel.class) .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .handler(dispatcher); ChannelFuture future = bootstrap.bind(host, port).await(); if(!future.isSuccess()) throw new Exception(String.format("Fail to bind on [host = %s , port = %d].", host, port), future.cause()); return bootstrap; } } @Component @Sharable public class Dispatcher extends ChannelInboundHandlerAdapter implements InitializingBean { private int workerThreads; private ExecutorService executorService; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { DatagramPacket packet = (DatagramPacket) msg; final Channel channel = ctx.channel(); executorService.execute(new Runnable() { @Override public void run() { //Process the packet and produce a response packet (below) DatagramPacket responsePacket = ...; ChannelFuture future; try { future = channel.writeAndFlush(responsePacket).await(); } catch (InterruptedException e) { return; } if(!future.isSuccess()) log.warn("Failed to write response packet."); } }); } @Override public void afterPropertiesSet() throws Exception { executorService = Executors.newFixedThreadPool(workerThreads); } } 

我有以下问题:

  1. 在由工作线程使用之前, Dispatcher类的channelRead方法接收的DatagramPacket是否应该重复? 我想知道这个数据包是否在channelRead方法返回后被销毁,即使工作线程保留了引用。
  2. 在所有工作线程之间共享Channel并让它们同时调用writeAndFlush是否安全?

谢谢!

  1. 不。 如果你需要让对象存活更长时间,你可以把它变成别的东西,或者一旦你完成它就使用ReferenceCountUtil.retain(datagram)然后使用ReferenceCountUtil.retain(datagram) 。 您也不应该在执行程序服务上执行await() ,您应该为发生的任何事情注册处理程序。

  2. 是的,通道对象是线程安全的,可以从许多不同的线程调用它们。