netty的并发编码
编码器的编码方法会同时执行吗? 我观察到编码方法可能由不同的线程并发。 管道定义为:
Channels.pipeline( idleHandler, new AmfDecoder(GameEvent.class), new AmfEncoder(), concurrencyHandler, new WebHandler());
编码器:
public class AmfEncoder extends OneToOneEncoder{ private final SerializationContext serializationContext = new SerializationContext(); private final Amf3Output amfout = new Amf3Output(serializationContext); @Override protected Object encode(ChannelHandlerContext arg0, Channel arg1, Object arg2) throws Exception { T e = (T)arg2; ByteArrayOutputStream byteoutStreamSize = new ByteArrayOutputStream(); amfout.setOutputStream(byteoutStreamSize); amfout.writeObject(e.getBody()); // byteoutStreamSize has small probability become empty at here, in debug mode I can sure e.getBody() has data // I thought byteoutStreamSize might be empty by another thread call "amfout.flush()" or "amfout.reset()" amfout.flush(); //... amfout.reset(); }
}
Channel.write的调用不仅是线程属于netty的工作线程,也不属于Exeutionhandler中的线程。 有一个由我自己创建的线程池将调用Channel.write()。 将amfout&serializationContext的2个变量移动到encode()函数为局部变量后,问题就消失了。
Doc说ChannelPipeline是线程安全的,我读netty 3.4.5发现“添加”,“删除”…操作被锁定,但sendDownstream&sendUpstream没有锁定。 因此,如果存在不属于工作线程池或ExecutionHandler线程池的线程,并且所有这些线程都调用Channel.write(),则会在解码器和编码器中发生并发问题
Channel管道是线程安全的,但问题是下游事件和上游事件的事件执行模型不同:
-
默认情况下,使用(多个)用户线程执行下游处理程序。
-
默认情况下,下游处理程序不是线程安全的,因为它们可以由任何顺序的多个用户线程执行(通常,
DownstreamEvents
是轻量级的,因此它们的处理程序不会在实例变量中维护状态)。 看看Netty代码库中的OneToOneEncoder
实现。 他们都没有维持一个州。 -
上游处理程序默认使用单个线程执行,或者逐个使用多个线程执行(如果使用执行处理程序)。
-
由于单线程事件执行(即使它们可以保持可变状态),上游处理程序是线程安全的。
因此,有人可能会错误地认为下游处理程序与上游处理程序一样是线程安全的。
正如您所说,如果不需要state,解决方案是将实例变量移动到本地范围。 否则,使下游处理方法线程安全。
我认为你正确理解了并发性。 你必须:
- 确保您的通道处理程序是线程安全的(没有变异的实例变量)
- 使用ChannelPipelineFactory,以便为每个通道创建新管道。