17-Netty管道机制
| 版本 | 内容 | 时间 | 
|---|---|---|
| V1 | 新建 | 2022年03月16日12:30:42 | 
| V2 | 重构 | 2023年05月29日01:01:33 | 
管道相关的类的关系
Netty 管道中涉及到三个接口:
- ChannelPipeline:管道接口,定义了管道内的 ChannelHandler 的增删改查的操作,而且提供了管道内的传播的事件的API;
- ChannelHandlerContext:管道是一个双向链表,ChannelHandlerContext 就是链表上的节点对象,是管道内的事件传播的具体实现。ChannelHandlerContext 内部封装着 ChannelHandler 处理器;
- ChannelHandler:处理器,Netty 自带的处理器和用户自定义的处理器;
总的来说:ChannelPipeline 是一个拦截流经 Channel 的入站和出站事件的 ChannelHandler 实例链,可以看成是ChannelHandler 的容器,新建的 Channel 都会被分配一个新的 ChannelPipeline。在 Channel 的 Pipeline 中,存放着许多 ChannelHandlerContext ,ChannelHandlerContext 通过前后指针连接组成链表,每个 ChannelHandlerContext 内部封装一个 ChannelHandler 对象。事件通过 ChannelHandlerContext 对象来进行前后传递,ChannelHandlerContext 内部的 ChannelHandler 来处理传递进来的事件。
一个管道的示例图如下:

管道的创建
管道创建的入口
每一个 Channel 都有一个自己的 pipeline。NioServerSocketChannel 和 NioSocketChannel 创建的流程中,最终在父类 AbstractChannel 内会初始化它们内部的 Pipeline。
下面看下 AbstractChannel 的构造方法:
protected AbstractChannel(Channel parent) {
    this.parent = parent;
    // 给Channel实例分配一个唯一的ID对象
    id = newId();
    // 封装一个unsafe对象
    // 当Channel是NioServerSocketChannel时,Unsafe实例是NioMessageUnSafe
    // 当Channel是NioSocketChannel时,实例是NioByteUnSafe
    unsafe = newUnsafe();
    // 构建Channel消息处理管道Pipeline
    // 设置好两个节点(默认的处理器),一个头结点HeadContext,一个尾节点TailContext
    pipeline = newChannelPipeline();
}创建 Channel 的 pipeline 的关键就是在 io.netty.channel.DefaultChannelPipeline 方法里:
protected DefaultChannelPipeline newChannelPipeline() {
    return new DefaultChannelPipeline(this);
}其实就是 new 一个 DefaultChannelPipeline 对象。
DefaultChannelPipeline
继承关系

前面在 AbstractChannel 中调用的构造方法如下:
// 管道的头节点
final AbstractChannelHandlerContext head;
// 管道的尾节点
final AbstractChannelHandlerContext tail;
// 当前管道所属的 Channel 通道
private final Channel channel;
/**
 * 创建一个实例
 * 初始化一个头节点和尾节点
 *
 * @param channel
 */
protected DefaultChannelPipeline(Channel channel) {
    this.channel = ObjectUtil.checkNotNull(channel, "channel");
    succeededFuture = new SucceededChannelFuture(channel, null);
    voidPromise =  new VoidChannelPromise(channel, true);
    tail = new TailContext(this);
    head = new HeadContext(this);
    head.next = tail;
    tail.prev = head;
}首先就是将当前 ChannelPipeline 属于的 Channel 保存到自己的属性中,然后创建两个 ChannelHandlerContext 类型的 HeadContext 和 TailContext,分别代表 Pipeline 的双向链表的头节点和尾结点。
管道的初始状态如下:
--------------           ---------------
|			 |  ------>  |             |
| HeadContext|           | TailContext |
|			 |  <------  |             |
--------------           ---------------HeadContext

HeadContext 是 Pipeline 内部双向链表的头节点,实现了 ChannelHandlerContext 接口。
HeadContext 又实现了 ChannelInboundHandler 和 ChannelOutboundHandler 接口,说明 HeadContext 即是一个 ChannelHandlerContext 又是一个 ChannelHandler ,它可以同时处理 Inbound 入站事件和 Outbound 出站事件。
另外 HeadContext 内部有个 Unsafe 类的字段,Unsafe 是提供 Channel 类的底层操作的接口,例如读、写、连接和绑定等I/O 操作,这也说明 I/O 事件在 pipeline 中的传播最终会落在 HeadContext 中进行最后的 I/O 处理。
下面是 HeadContext 提供的 API,可以看到既有出站事件的 API,也有入站事件的 API。

TailContext

TailContext 是 Pipeline 内部双向链表的头节点,实现了 ChannelHandlerContext 接口。
TailContext 又实现了 ChannelInboundHandler 接口,说明 TailContext 即是一个 ChannelHandlerContext 又是一个 ChannelHandler ,它可以处理 Inbound 入站事件。
下面是 TailContext 提供了 API,可以看到提供了入站事件的 API。

我们知道入站事件是从 HeadContext 向 TailContext 传播的,既然 TailContext 也是一个 ChannelInboundHandler,那么 TailContext 作为入站处理器的作用是什么呢?假如我们自定义的入站处理器没有处理某次传递的 channelRead 事件,那么需要一个兜底的处理,TailContext 就是作为一个兜底的处理器,比如释放可能未释放的堆外内存。
protected void onUnhandledInboundMessage(Object msg) {
    try {
        logger.debug(
            "Discarded inbound message {} that reached at the tail of the pipeline. " +
            "Please check your pipeline configuration.", msg);
    } finally {
        ReferenceCountUtil.release(msg);
    }
}管道的增删改查
Pipeline 添加 ChannelHandler 的 API 有很多,主要如下:
- addFirst 系列;
- addBefore 系列;
- addAfter 系列;
- addLast 系列;
以 addLast 方法为例分析
DefaultChannelPipeline#addLast(EventExecutorGroup, String, ChannelHandler)
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
    final AbstractChannelHandlerContext newCtx;
    synchronized (this) {
        // 检查是否可以多个线程使用这个Handler    @Sharable
        checkMultiplicity(handler);
        newCtx = newContext(group, filterName(name, handler), handler);
        // 将newCtx放到Head和tail之间
        addLast0(newCtx);
        // 如果 registered 是 false 则表示 Channel 还未注册到 EventLoop 上去
        // 此时会添加 ctx 到 pipeline,然后添加一个任务去 ChannelHandler.handlerAdded(...)
        if (!registered) {
            newCtx.setAddPending();
            // 添加一个任务
            callHandlerCallbackLater(newCtx, true);
            return this;
        }
        // 执行到这里,说明添加的ChannelHanddler已经完成了注册
        EventExecutor executor = newCtx.executor();
        if (!executor.inEventLoop()) {
            callHandlerAddedInEventLoop(newCtx, executor);
            return this;
        }
    }
    callHandlerAdded0(newCtx);
    return this;
}- 首先因为 addLast 方法可能会有很多线程同时调用,所以需要使用 synchronized 保证线程安全; 
- Netty 中未被 @Sharable 注解标记的 ChannelHandler 是不允许重复添加的。 - checkMultiplicity(handler)就是用于校验这个的。需要注意的是使用共享 ChannelHandler 的时候需要确保其线程安全性;
- 创建 ChannelHandlerContext 实例,封装一下 ChannelHandler。ChannelHandlerContext 关注事件传播,ChannelHandler 关注的是业务处理; 
- addLast0(newCtx)将新创建的 ChannelHandlerContext 插入到链表末尾(其实是 TailContext 的前一个节点)。需要注意的是刚插入到链表中的 ChannelHandlerContext 还只是 INIT 初始状态。当 ChannelHandler 的 handlerAdded 方法被回调时,状态才变为 ADD_COMPLETE ,而只有 ADD_COMPLETE 状态的 ChannelHandler 才能响应 pipeline 中传播的事件;
- 回调 ChannelHandler 的 handlerAdded 方法有几种情况: - Channel 还未注册到 NioEventLoop 上,也就是说服务端或者客户端还未启动完成,此时会封装成一个任务添加到一个单链表中,等待 Channel 注册到 NioEventLoop 上时,就会触发执行这个任务去执行 handlerAdded 方法;
 - if (!registered) { newCtx.setAddPending(); // 添加一个任务 callHandlerCallbackLater(newCtx, true); return this; }- 这里主要是保证执行 handlerAdded 方法的线程是 Channel 指定的 EventExecutor;
 - // 执行到这里,说明添加的ChannelHanddler已经完成了注册 EventExecutor executor = newCtx.executor(); if (!executor.inEventLoop()) { callHandlerAddedInEventLoop(newCtx, executor); return this; } } callHandlerAdded0(newCtx); return this;
管道的增删查改的源码比较简单,就是针对双向链表的操作,其他的 API 就不分析了。
DefaultChannelHandlerContext
继承体系

DefaultChannelHandlerContext 类
DefaultChannelHandlerContext 类就一个字段,就是保存的它封装的 ChannelHandler 实例。
final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {
    private final ChannelHandler handler;
    DefaultChannelHandlerContext(
            DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
        // 参数1:当前ctx所属的pipeline
        // 参数2:执行器,
        // 参数3:名字
        // 参数4:ctx封装的handler的字节码对象
        super(pipeline, executor, name, handler.getClass());
        this.handler = handler;
    }
    @Override
    public ChannelHandler handler() {
        return handler;
    }
}AbstractChannelHandlerContext 类
常量和成员变量
AbstractChannelHandlerContext 中有两个自身类型的字段,这就是双向链表中的前驱和后驱指针。
// 后驱节点
volatile AbstractChannelHandlerContext next;
// 前驱节点
volatile AbstractChannelHandlerContext prev;AbstractChannelHandlerContext 上下文对象的状态,默认是初始状态 INIT。
/**
 * 表示 handlerAdded 方法即将被调用
 */
private static final int ADD_PENDING = 1;
/**
 * 表示 handlerAdded 方法已经被调用
 */
private static final int ADD_COMPLETE = 2;
/**
 * 表示 handlerRemoved 已经被调用
 */
private static final int REMOVE_COMPLETE = 3;
/**
 * 初始状态, handlerAdded 和 handlerRemoved 方法都没有被调用
 */
private static final int INIT = 0;
private volatile int handlerState = INIT;代表状态的 handlerState 字段,要保证它的更新是原子性的,所以提供了一个原子更新器:
// handlerState 字段改变的 CAS 更新器
private static final AtomicIntegerFieldUpdater<AbstractChannelHandlerContext> HANDLER_STATE_UPDATER =
    AtomicIntegerFieldUpdater.newUpdater(AbstractChannelHandlerContext.class, "handlerState");其他属性:
// 当前ctx所属的pipeline
private final DefaultChannelPipeline pipeline;
// 当前ctx的名字
private final String name;
// 当前ctx是否有序
private final boolean ordered;
// 用来判断是否跳过执行器 ChannelHandler 的某些事件处理方法
// io.netty.channel.ChannelHandlerMask.mask0 计算得到
private final int executionMask;
// Will be set to null if no child executor should be used, otherwise it will be set to the
// child executor.
// 如果这个值是 null,那么上下文的执行器用的就是所属通道 Channel 的事件轮询器。
final EventExecutor executor;解析一下这几个属性:
- DefaultChannelPipeline pipeline:标记当前 ChannelHandlerContext 是属于那个管道的;
- String name:表示当前 ChannelHandlerContext 的名字;
- boolean ordered:一般情况下,必须当 ChannelHandlerContext 上下文状态是 ADD_COMPLETE 才能够处理管道中的事件,假如 ordered 是 true 的情况下,上下文状态是 ADD_PENDING 状态就可以处理管道中的事件了;
- int executionMask:掩码。用于标记当前 ChannelHandlerContext 内部的 ChannelHandler 是入站处理器还是出站处理器,能够处理那些事件;
- EventExecutor executor:channelHandler 对应的 executor,除非特殊指定,默认就是 Channel 的绑定的 NioEventLoop;
构造方法
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor,
                              String name, Class<? extends ChannelHandler> handlerClass) {
    this.name = ObjectUtil.checkNotNull(name, "name");
    this.pipeline = pipeline;
    this.executor = executor;
    // mask方法用于计算一个掩码,作用是方便ctx前后传递时,查找合适的下一个ctx
    this.executionMask = mask(handlerClass);
    // Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
    // 表示上下文的事件执行器是不是有序的,即以有序/串行的方式处理所有提交的任务。
    // executor == null,说明当前上下文用的是通道Channel的 channel().eventLoop(),这个肯定是有序的
    ordered = executor == null || executor instanceof OrderedEventExecutor;
}构造方法中调用了 ChannelHandlerMask 类的一个重要的 mask 方法,重点分析 mask 方法。mask 方法的作用是,**根据传入的 ChannelHandler 的字节码对象,来计算出一个掩码,可以根据这个掩码来判断当前 ChannelHandler 是否重写过某些方法。**也就是说,根据这个掩码来标记 ChannelHandler 能够处理那些事件。
@Skip 注解的作用
在分析 io.netty.channel.ChannelHandlerMask#mask 方法之前,得先了解一下 @Skip 注解。
ChannelInboundHandlerAdapter 和 ChannelOutboundHandlerAdapter 类中每个方法都有 @Skip 注解,这个注解后面将在 mask 方法中用于判断自己实现的 ChannelHandler 类是否重写过其中的某种方法,因为我们自己写的 ChannelHandler 中是没有标注 @Skip 注解的。
ChannelHandlerMask 中有个 isSkippable 方法,根据反射判断某个方法是否有 @Skip 注解。
private static boolean isSkippable(
        final Class<?> handlerType, final String methodName, final Class<?>... paramTypes) throws Exception {
    return AccessController.doPrivileged(new PrivilegedExceptionAction<Boolean>() {
        @Override
        public Boolean run() throws Exception {
            Method m;
            try {
                m = handlerType.getMethod(methodName, paramTypes);
            } catch (NoSuchMethodException e) {
               	// ..... 省略日志打印 ......
                return false;
            }
            return m.isAnnotationPresent(Skip.class);
        }
    });
}关于 ChannelInboundHandlerAdapter 和 ChannelOutboundHandlerAdapter 中的方法,其实就是对应的入站和出站事件,举个例子:ChannelInboundHandlerAdapter#channelRegistered
/**
 * Calls {@link ChannelHandlerContext#fireChannelRegistered()} to forward
 * to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
 *
 * Sub-classes may override this method to change behavior.
 *
 * 调用 ChannelHandlerContext.fireChannelRegistered() 方法
 * 转发到 ChannelPipeline 中的下一个 ChannelInboundHandler。
 */
@Skip
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
    ctx.fireChannelRegistered();
}ChannelInboundHandlerAdapter 和 ChannelOutboundHandlerAdapter 类中每个方法都有 @Skip 注解,而我们自己要实现 ChannelHandler,一般也就是继承这两个类去实现的,但是我们自己实现的 ChannelHandler 重写它的方法的时候是不会自己加 @Skip 注解的(除非你非要自己手动加......),所以,这就可以标记出来那些方法是我们自己的 ChannelHandler 重写过的。
ChannelHandlerMask 类
ChannelHandlerMask 类就是处理 @Skip 注解的关键。
前面在分析 AbstractChannelHandlerContext 类的构造方法的时候看到过一个 executionMask 字段,这个字段的作用就是 用于标记当前 ChannelHandlerContext 内部的 ChannelHandler 是入站处理器还是出站处理器,能够处理那些事件。计算这个字段的入口就是 ChannelHandlerMask#mask 方法。
ChannelHandlerMask 的常量
在看 ChannelHandlerMask#mask 方法之前,得先了解一下 ChannelHandlerMask 类的一些常量的含义:
static final int MASK_EXCEPTION_CAUGHT = 1;
static final int MASK_CHANNEL_REGISTERED = 1 << 1;
static final int MASK_CHANNEL_UNREGISTERED = 1 << 2;
static final int MASK_CHANNEL_ACTIVE = 1 << 3;
static final int MASK_CHANNEL_INACTIVE = 1 << 4;
static final int MASK_CHANNEL_READ = 1 << 5;
static final int MASK_CHANNEL_READ_COMPLETE = 1 << 6;
static final int MASK_USER_EVENT_TRIGGERED = 1 << 7;
static final int MASK_CHANNEL_WRITABILITY_CHANGED = 1 << 8;
static final int MASK_BIND = 1 << 9;
static final int MASK_CONNECT = 1 << 10;
static final int MASK_DISCONNECT = 1 << 11;
static final int MASK_CLOSE = 1 << 12;
static final int MASK_DEREGISTER = 1 << 13;
static final int MASK_READ = 1 << 14;
static final int MASK_WRITE = 1 << 15;
static final int MASK_FLUSH = 1 << 16;这些常量是按位标记每个方法,如代表异常的方法的常量 MASK_EXCEPTION_CAUGHT,它的二进制就是0000 0000 0000 0000 0000 0000 0000 0001。
根据这些常量的组合的属性
- MASK_ONLY_INBOUND:入站事件掩码;
- MASK_ALL_INBOUND:入站事件掩码,包含 MASK_EXCEPTION_CAUGHT;
- MASK_ONLY_OUTBOUND:出站事件的掩码;
- MASK_ALL_OUTBOUND:出站事件的掩码,包含 MASK_EXCEPTION_CAUGHT;
// 计算入站事件掩码
// 0000 0000 0000 0000 0000 0001 1111 1110
static final int MASK_ONLY_INBOUND =  MASK_CHANNEL_REGISTERED |
        MASK_CHANNEL_UNREGISTERED | MASK_CHANNEL_ACTIVE | MASK_CHANNEL_INACTIVE | MASK_CHANNEL_READ |
        MASK_CHANNEL_READ_COMPLETE | MASK_USER_EVENT_TRIGGERED | MASK_CHANNEL_WRITABILITY_CHANGED;
// 计算入站事件掩码,包含 MASK_EXCEPTION_CAUGHT
// 0000 0000 0000 0000 0000 0001 1111 1111
private static final int MASK_ALL_INBOUND = MASK_EXCEPTION_CAUGHT | MASK_ONLY_INBOUND;
// 计算出出站事件的掩码
// 0000 0000 0000 0001 1111 1110 0000 0000
static final int MASK_ONLY_OUTBOUND =  MASK_BIND | MASK_CONNECT | MASK_DISCONNECT |
        MASK_CLOSE | MASK_DEREGISTER | MASK_READ | MASK_WRITE | MASK_FLUSH;
// 计算出出站事件的掩码 包含 MASK_EXCEPTION_CAUGHT
// 0000 0000 0000 0001 1111 1110 0000 0001
private static final int MASK_ALL_OUTBOUND = MASK_EXCEPTION_CAUGHT | MASK_ONLY_OUTBOUND;ChannelHandlerMask#mask
/**
 * Return the {@code executionMask}.
 */
static int mask(Class<? extends ChannelHandler> clazz) {
    // Try to obtain the mask from the cache first. If this fails calculate it and put it in the cache for fast
    // lookup in the future.
    Map<Class<? extends ChannelHandler>, Integer> cache = MASKS.get();
    Integer mask = cache.get(clazz);
    if (mask == null) {
        mask = mask0(clazz);
        cache.put(clazz, mask);
    }
    return mask;
}其中 MASKS 是一个 FastThreadLocal,用来缓存 ChannelHandler 子类对应的执行标记 mask,就不用每次都计算掩码了。
// 用来缓存, ChannelHandler 子类对应的执行标记 mask,就不用每次都需要计算了
private static final FastThreadLocal<Map<Class<? extends ChannelHandler>, Integer>> MASKS =
    new FastThreadLocal<Map<Class<? extends ChannelHandler>, Integer>>() {
    @Override
    protected Map<Class<? extends ChannelHandler>, Integer> initialValue() {
        return new WeakHashMap<Class<? extends ChannelHandler>, Integer>(32);
    }
};可以看到最终是调用 io.netty.channel.ChannelHandlerMask#mask0 方法了,方法的返回值是一个int的二进制,
- 假如对应下标位(代表指定方法) 的值是 1,表示 handlerType 类型的 ChannelHandler 中重写了该方法;
- 位值是 0 表示 handlerType 类型的 ChannelHandler 中没有重写该方法;
ChannelHandler 的掩码包含的是该 ChannelHandler 重写的事件方法的掩码集合。当事件在 pipeline 中传播的时候,在 ChannelHandlerContext 中可以利用这个掩码来判断,当前 ChannelHandler 是否应该响应这个事件,也就是是否要执行这个 ChannelHandler 里面的逻辑了。
/**
 * Calculate the {@code executionMask}.
 * 返回值是一个int类型的 二进制
 * 对应下标位 代表指定方法 位的值是1,表示方法handlerType类型中实现了该方法
 * 位置是0表示handlerType类型中 没有实现该方法
 */
private static int mask0(Class<? extends ChannelHandler> handlerType) {
    // 0000 0000 0000 0000 0000 0000 0000 0001
    int mask = MASK_EXCEPTION_CAUGHT;
    try {
        // 条件成立 说明handlerType类型是属于ChannelInBoundHandler的子类
        if (ChannelInboundHandler.class.isAssignableFrom(handlerType)) {
            // 结果 0000 0000 0000 0000 0000 0001 1111 1111
            mask |= MASK_ALL_INBOUND;
            // 参数1:handler的真实class类型
            // 参数2:检查的方法名
            // 参数3:ChannelHandlerContext.class
            // isSkippable返回handlerType这个class 有没有重写指定的方法,重写之后指定方法上的Skip注解就没有了
            // 条件成立,说明没有重写该方法
            if (isSkippable(handlerType, "channelRegistered", ChannelHandlerContext.class)) {
                //     0000 0000 0000 0000 0000 0001 1111 1111
                // 取反 1111 1111 1111 1111 1111 1111 1111 1101
                // &   0000 0000 0000 0000 0000 0001 1111 1101
                // 也就是说 假如你自己实现了这些方法 该位置就是1,没有实现该方法 该位置就是0
                mask &= ~MASK_CHANNEL_REGISTERED;
            }
            if (isSkippable(handlerType, "channelUnregistered", ChannelHandlerContext.class)) {
                mask &= ~MASK_CHANNEL_UNREGISTERED;
            }
            if (isSkippable(handlerType, "channelActive", ChannelHandlerContext.class)) {
                mask &= ~MASK_CHANNEL_ACTIVE;
            }
            if (isSkippable(handlerType, "channelInactive", ChannelHandlerContext.class)) {
                mask &= ~MASK_CHANNEL_INACTIVE;
            }
            if (isSkippable(handlerType, "channelRead", ChannelHandlerContext.class, Object.class)) {
                mask &= ~MASK_CHANNEL_READ;
            }
            if (isSkippable(handlerType, "channelReadComplete", ChannelHandlerContext.class)) {
                mask &= ~MASK_CHANNEL_READ_COMPLETE;
            }
            if (isSkippable(handlerType, "channelWritabilityChanged", ChannelHandlerContext.class)) {
                mask &= ~MASK_CHANNEL_WRITABILITY_CHANGED;
            }
            if (isSkippable(handlerType, "userEventTriggered", ChannelHandlerContext.class, Object.class)) {
                mask &= ~MASK_USER_EVENT_TRIGGERED;
            }
        }
        if (ChannelOutboundHandler.class.isAssignableFrom(handlerType)) {
            mask |= MASK_ALL_OUTBOUND;
            if (isSkippable(handlerType, "bind", ChannelHandlerContext.class,
                    SocketAddress.class, ChannelPromise.class)) {
                mask &= ~MASK_BIND;
            }
            if (isSkippable(handlerType, "connect", ChannelHandlerContext.class, SocketAddress.class,
                    SocketAddress.class, ChannelPromise.class)) {
                mask &= ~MASK_CONNECT;
            }
            if (isSkippable(handlerType, "disconnect", ChannelHandlerContext.class, ChannelPromise.class)) {
                mask &= ~MASK_DISCONNECT;
            }
            if (isSkippable(handlerType, "close", ChannelHandlerContext.class, ChannelPromise.class)) {
                mask &= ~MASK_CLOSE;
            }
            if (isSkippable(handlerType, "deregister", ChannelHandlerContext.class, ChannelPromise.class)) {
                mask &= ~MASK_DEREGISTER;
            }
            if (isSkippable(handlerType, "read", ChannelHandlerContext.class)) {
                mask &= ~MASK_READ;
            }
            if (isSkippable(handlerType, "write", ChannelHandlerContext.class,
                    Object.class, ChannelPromise.class)) {
                mask &= ~MASK_WRITE;
            }
            if (isSkippable(handlerType, "flush", ChannelHandlerContext.class)) {
                mask &= ~MASK_FLUSH;
            }
        }
        // "exceptionCaught" 方法需要单独判断,因为 ChannelInboundHandler 和 ChannelOutboundHandler 都有它
        if (isSkippable(handlerType, "exceptionCaught", ChannelHandlerContext.class, Throwable.class)) {
            mask &= ~MASK_EXCEPTION_CAUGHT;
        }
    } catch (Exception e) {
        // Should never reach here.
        PlatformDependent.throwException(e);
    }
    return mask;
}管道的事件传播
入站的事件传播
管道传递事件入口
以 ChannelRead 事件为例来分析入站事件的传播,调用 pipeline 的 API 传递事件,例如:
pipeline.fireChannelRead(byteBuf);DefaultChannelPipeline#fireChannelRead
@Override
public final ChannelPipeline fireChannelRead(Object msg) {
    AbstractChannelHandlerContext.invokeChannelRead(head, msg);
    return this;
}可以看到就是调用 AbstractChannelHandlerContext 的静态方法 invokeChannelRead 出处理事件传播,其中入参:
- head 参数:就是管道内的 HeadContext,从这里也说明了,入站事件就是从 HeadContext 向 TailContext 方向传播的;
- mag 参数:就是管道内传递的要处理的数据;
好了,继续跟进看 AbstractChannelHandlerContext 的 invokeChannelRead 方法:
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    // 资源泄漏相关
    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    // 保证是 AbstractChannelHandlerContext 绑定的线程执行 invokeChannelRead 方法
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelRead(m);
    } else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRead(m);
            }
        });
    }
}调用了 invokeChannelRead 的重载方法:
private void invokeChannelRead(Object msg) {
    // invokeHandler 判断 ChannelHandler 的状态,确保 ChannelHandler 能够处理事件
    if (invokeHandler()) {
        try {
            // 调用 ChannelHandler 的对应处理方法
            ((ChannelInboundHandler) handler()).channelRead(this, msg);
        } catch (Throwable t) {
            // 如果发生异常,则就 ChannelHandler 的 exceptionCaught 方法
            invokeExceptionCaught(t);
        }
    } else {
        // ChannelHandler 的状态不对,无法处理该事件,继续向后传播
        fireChannelRead(msg);
    }
}ChannelHandlerContext 传递事件
关键点就是调用 ChannelHandler 的对应的 channelRead 方法。一般在 ChannelHandler 中处理完后,都会调用 AbstractChannelHandlerContext 对应的 fire 系列方法,例如 fireChannelRead 方法。
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
    invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
    return this;
}通过 AbstractChannelHandlerContext 的 findContextInbound 方法找到下一个能够处理 channelRead 方法的处理器
/**
 * 找到下一个入站Handler
 *
 * @param mask
 */
private AbstractChannelHandlerContext findContextInbound(int mask) {
    AbstractChannelHandlerContext ctx = this;
    EventExecutor currentExecutor = executor();
    do {
        ctx = ctx.next;
        // 0000 0000 0000 0000 0000 0001 1111 1110
    } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_INBOUND));
    return ctx;
}就是一个 do...while... 循环,在双向链表中向后查找 ChannelHandlerContext 中封装的 ChannelHandler 是否能够处理对应的入站事件,关键点就是 while 条件的 skipContext 方法。
private static boolean skipContext(
        AbstractChannelHandlerContext ctx, EventExecutor currentExecutor, int mask, int onlyMask) {
    // (ctx.executor() == currentExecutor && (ctx.executionMask & mask) == 0)
    // 只有当 EventExecutor 相同的时候,才会考虑是否跳过 ctx,因为我们要保证事件处理的顺序。
    // onlyMask 0000 0000 0000 0000 0000 0001 1111 1110
    // mask     0000 0000 0000 0000 0000 0000 0000 0010
    return (ctx.executionMask & (onlyMask | mask)) == 0 ||
            // See https://github.com/netty/netty/issues/10067
            (ctx.executor() == currentExecutor && (ctx.executionMask & mask) == 0);
}方法的返回值表示是否需要跳过这个 ctx 上下文,返回 true 表示跳过。
- (ctx.executionMask & (onlyMask | mask)) == 0- 假如返回 true,表示当前这个 ctx 对象的 executionMask 掩码中没有 (onlyMask | mask)中的任何方法,也就很容易就判断 ChannelHandler 不属于入站事件或者出站事件,则返回 true,表示跳过这个 ctx;
- 返回 false,表示自定义的 ChannelHandler 是对应的出站处理器或者入站处理器;
 
- 假如返回 true,表示当前这个 ctx 对象的 executionMask 掩码中没有 
- (ctx.executor() == currentExecutor && (ctx.executionMask & mask) == 0)- ctx.executor() == currentExecutor,这个判断是为了保证事件处理的顺序,如果事件的执行器不一样,无论是否被 @skip 注解标记也不能跳过这个 ctx;
- (ctx.executionMask & mask) == 0,判断自定义 ChannelHandler 是否重写过指定方法,假如重写过 executionMask 掩码的指定位置就是 1 ,此时返回 false,表示不跳过这个 ctx;
 
findContextInbound 方法结束后,找到了一个封装了实现了 channelRead 方法的 ChannelHandler 对象的 ctx。拿到后面的 ctx 后,就会调用 invokeChannelRead 方法,就去执行该 ChannelHandler 处理 channelRead 事件的逻辑了。
出站事件的传播
以 bind 事件为例来分析出站事件的传播,AbstractChannel#bind(SocketAddress, ChannelPromise):
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
    return pipeline.bind(localAddress, promise);
}可以看到就是调用 pipeline 的 API
@Override
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
    return tail.bind(localAddress, promise);
}调用 TailContext 的 bind 方法,该 bind 方法的具体实现是在 TailContext 的抽象父类 AbstractChannelHandlerContext 中。
AbstractChannelHandlerContext#bind(SocketAddress, ChannelPromise)
@Override
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
    ObjectUtil.checkNotNull(localAddress, "localAddress");
    if (isNotValidPromise(promise, false)) {
        // cancelled
        return promise;
    }
    // 找到上一个出站处理器的上下文对象
    final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeBind(localAddress, promise);
    } else {
        safeExecute(executor, new Runnable() {
            @Override
            public void run() {
                next.invokeBind(localAddress, promise);
            }
        }, promise, null, false);
    }
    return promise;
}关键点就是 findContextOutbound 方法了,从双向链表的尾部向前查找出站处理器去处理 bind 事件,最终会走到 HeadContext 节点。
/**
 * 从双向链表的tail开始,不断向前找
 * @param mask
 * @return
 */
private AbstractChannelHandlerContext findContextOutbound(int mask) {
    AbstractChannelHandlerContext ctx = this;
    EventExecutor currentExecutor = executor();
    do {
        ctx = ctx.prev;
    } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_OUTBOUND));
    return ctx;
}异常事件的传播
异常事件的传播分为两种,
- 一种是 Netty 框架自身处理导致的异常,此时是从 HeadContext 向后传播异常事件;
- 一种是某个 ChannelHandler 中处理事件逻辑发生异常,这时该 ChannelHandler 的 exceptionCaught 方法会被回调。用户可以在这里处理异常事件,并决定是否通过 ctx.fireExceptionCaught(cause) 继续向后传播异常事件。
Netty 自身的异常
以客户端 Channel 处理 READ 事件为例,NioByteUnsafe#read
@Override
public final void read() {
    // ...... 省略 ......
    try {
        // ...... 省略业务处理逻辑 ......
    } catch (Throwable t) {
        handleReadException(pipeline, byteBuf, t, close, allocHandle);
    } finally {
        // ...... 代码块 .....
    }
}NioByteUnsafe#handleReadException
private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, 
                                 Throwable cause, boolean close,
                                 RecvByteBufAllocator.Handle allocHandle) {
    if (byteBuf != null) {
        if (byteBuf.isReadable()) {
            readPending = false;
            pipeline.fireChannelRead(byteBuf);
        } else {
            byteBuf.release();
        }
    }
    allocHandle.readComplete();
    pipeline.fireChannelReadComplete();
    pipeline.fireExceptionCaught(cause);
    // If oom will close the read event, release connection.
    // See https://github.com/netty/netty/issues/10434
    if (close || cause instanceof OutOfMemoryError || cause instanceof IOException) {
        closeOnRead(pipeline);
    }
}这个方法的 pipeline.fireExceptionCaught(cause) 代码就是在管道中传播异常事件。
ChannelHandler 处理事件逻辑的异常
假如在 ChannelHandler 中处理 ChannelActive 事件的逻辑时发送异常,就会触发改 ChannelHandler 中的 exceptionCaught 方法:
AbstractChannelHandlerContext#invokeChannelActive()
private void invokeChannelActive() {
    if (invokeHandler()) {
        try {
            ((ChannelInboundHandler) handler()).channelActive(this);
        } catch (Throwable t) {
            invokeExceptionCaught(t);
        }
    } else {
        fireChannelActive();
    }
}Task 特殊事件处理 - 减少实例创建
AbstractChannelHandlerContext 的内部类 Tasks
private static final class Tasks {
    private final AbstractChannelHandlerContext next;
    // channelReadComplete 读完成的入站事件
    private final Runnable invokeChannelReadCompleteTask = new Runnable() {
        @Override
        public void run() {
            next.invokeChannelReadComplete();
        }
    };
    // read 设置读的出站事件
    private final Runnable invokeReadTask = new Runnable() {
        @Override
        public void run() {
            next.invokeRead();
        }
    };
    // channelWritabilityChanged 可读状态改变的入站事件
    private final Runnable invokeChannelWritableStateChangedTask = new Runnable() {
        @Override
        public void run() {
            next.invokeChannelWritabilityChanged();
        }
    };
    // flush 刷新数据的出站事件
    private final Runnable invokeFlushTask = new Runnable() {
        @Override
        public void run() {
            next.invokeFlush();
        }
    };
    Tasks(AbstractChannelHandlerContext next) {
        this.next = next;
    }
}有的入站和出站事件的处理,与上面的流程不一样,有四个事件:
- channelReadComplete:读完成的入站事件;
- channelWritabilityChanged:可读状态改变的入站事件;
- read:读的出站事件;
- flush:刷新数据的出站事件;
这样做法的好处是减少任务实例的创建,比如 channelReadComplete 事件
@Override
public ChannelHandlerContext fireChannelReadComplete() {
    invokeChannelReadComplete(findContextInbound(MASK_CHANNEL_READ_COMPLETE));
    return this;
}
static void invokeChannelReadComplete(final AbstractChannelHandlerContext next) {
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelReadComplete();
    } else {
        Tasks tasks = next.invokeTasks;
        if (tasks == null) {
            next.invokeTasks = tasks = new Tasks(next);
        }
        executor.execute(tasks.invokeChannelReadCompleteTask);
    }
}前面的一些方法的 else 分支下面一般是 executor.execute(new Runnable(){...}); 创建一个任务。
而上面这四个事件是获取 Task 类中对应的任务,减少 Runnable 实例的创建,因为这些事件的处理方法是没有参数的。
WriteTask - 写操作事件
private void write(Object msg, boolean flush, ChannelPromise promise) {
    ObjectUtil.checkNotNull(msg, "msg");
    try {
        // 检查 promise 是否有效
        if (isNotValidPromise(promise, true)) {
            ReferenceCountUtil.release(msg);
            // cancelled
            return;
        }
    } catch (RuntimeException e) {
        ReferenceCountUtil.release(msg);
        throw e;
    }
    // 找出上一个出站的ctx对象
    final AbstractChannelHandlerContext next = findContextOutbound(flush ?
            (MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
    // 添加附加信息,在内存泄露的时候,可以获取到这个附加信息
    final Object m = pipeline.touch(msg, next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        if (flush) {
            // 如果需要刷新,就调用 invokeWriteAndFlush 方法
            next.invokeWriteAndFlush(m, promise);
        } else {
            // 如果不需要刷新,就调用 invokeWrite 方法
            next.invokeWrite(m, promise);
        }
    } else {
        // 将写操作封装成一个 WriteTask,是 Runnable 子类。
        final WriteTask task = WriteTask.newInstance(next, m, promise, flush);
        if (!safeExecute(executor, task, promise, m, !flush)) {
            // We failed to submit the WriteTask. We need to cancel it so we decrement the pending bytes
            // and put it back in the Recycler for re-use later.
            //
            // See https://github.com/netty/netty/issues/8343.
            task.cancel();
        }
    }
}else 分支内封装了一个 WriteTask 对象来执行,WriteTask 是一个 Runable 对象。具体后续配合出站缓冲区分析。
小结
本篇分析了 Channel、ChannelHandler、ChannelHandlerContext、ChannelPipeline 之间的关系,已经入站事件和出站事件是如何传播的。
- 每个 Channel 都有一个自己的 ChannelPipeline;
- 每个 ChannelPipeline 都是一个双向链表,链表上的节点类型是 ChannelHandlerContext,每个节点中封装一个 ChannelHandler;
ChannelHandlerContext 主要负责事件的传播,而 ChannelHandler 主要处理事件的逻辑处理。两者各司其职,符合单一职责的思想。用户只需要关注 ChannelHandler 中的事件业务逻辑处理,无需关注事件是如何传播的。