3

编码器的编码方法会同时执行吗?我观察到编码方法可能由不同的线程并发。管道定义为:

Channels.pipeline(
    idleHandler,
    new AmfDecoder<GameEvent>(GameEvent.class),
    new AmfEncoder<GameEvent>(),
    concurrencyHandler,
    new WebHandler());

编码器:

public class AmfEncoder<T extends IAmfEvent> extends OneToOneEncoder{
private final SerializationContext serializationContext = new SerializationContext();
private final Amf3Output amfout = new Amf3Output(serializationContext);

@Override
protected Object encode(ChannelHandlerContext arg0, Channel arg1,
        Object arg2) throws Exception {
    T e = (T)arg2;
    ByteArrayOutputStream byteoutStreamSize = new ByteArrayOutputStream();
    amfout.setOutputStream(byteoutStreamSize);
    amfout.writeObject(e.getBody());
    // byteoutStreamSize has small probability become empty at here, in debug mode I can sure e.getBody() has data
    // I thought byteoutStreamSize might be empty by another thread call "amfout.flush()" or "amfout.reset()"
    amfout.flush();
    //...
    amfout.reset();
}

}

Channel.write的调用不仅是属于netty工作线程的线程,还是属于Exeutionhandler中的线程。有一个我自己创建的线程池将调用 Channel.write()。在我将 amfout 和 serializationContext 的 2 个变量移动到 encode() 函数中作为局部变量后,问题就消失了。

Doc 说 ChannelPipeline 是线程安全的,我阅读 netty 3.4.5 发现“添加”、“删除”......操作被锁定,但 sendDownstream 和 sendUpstream 没有锁定。所以如果有不属于worker线程池或ExecutionHandler线程池的线程,并且所有这些线程都调用Channel.write(),解码器和编码器会出现并发问题

4

2 回答 2

6

Channel 管道是线程安全的,但这里的问题是下游事件和上游事件的事件执行模型不同:

  • 默认情况下,使用(多个)用户线程执行下游处理程序。

  • 默认情况下,下游处理程序不是线程安全的,因为它们可以由多个用户线程以任何顺序执行(通常DownstreamEvents是轻量级的,因此它们的处理程序不会在实例变量中维护状态)。查看OneToOneEncoderNetty 代码库中的实现。他们都没有保持状态。

  • 上游处理程序默认使用单个线程或一个一个地使用多个线程执行(如果使用执行处理程序)。

  • 由于单线程事件执行(即使它们可以保持可变状态),上游处理程序是线程安全的。

因此,有人可能会错误地认为下游处理程序与上游处理程序一样是线程安全的。

正如您所说,如果不需要状态,解决方案是将实例变量移动到本地范围。否则,使下游处理方法线程安全。

于 2012-05-22T12:39:29.630 回答
0

我认为您正确理解了并发性。您必须:

  1. 确保您的通道处理程序是线程安全的(没有变异的实例变量)
  2. 使用 ChannelPipelineFactory 以便为每个通道创建一个新管道。
于 2012-05-22T11:58:26.153 回答