ChannelPipeline.md

801

一个 Channel 都有一个 ChannelPipeline 。以下是 AbstractChannel 的构造方法:

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    id = newId();
    unsafe = newUnsafe();
    pipeline = newChannelPipeline();
}

protected DefaultChannelPipeline newChannelPipeline() {
    return new DefaultChannelPipeline(this);
}

因此,一般来说 Channel 使用的是 默认的管道 也就是 DefaultChannelPipeline ,我们来到该类的构造方法:

protected DefaultChannelPipeline(Channel channel) {
    this.channel = ObjectUtil.checkNotNull(channel, "channel");
    succeededFuture = new SucceededChannelFuture(channel, null);
    voidPromise =  new VoidChannelPromise(channel, true);

    tail = new TailContext(this);
    head = new HeadContext(this);

    head.next = tail;
    tail.prev = head;
}

可以看到它实现了一个双向链表,其中头结点是 TailContext,尾结点是 HeadContext:

image20210809195931563.png
image20210808101744743.png

头结点和尾结点同时实现了 ChannelHandler 与 ChannelHandlerContext 接口,从变量类型可以看出双向链表的类型为 ChannelHandlerContext,而我们中间传入的 Handler 只是作为 Context 的一个属性,我们可以来看看我们添加 Handler 的地方:

.handler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline p = ch.pipeline();
        if (sslCtx != null) {
            p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
        }
        //p.addLast(new LoggingHandler(LogLevel.INFO));
        p.addLast(new EchoClientHandler());
    }
});

我们调用了 p.addLast 方法,参数为 ChannelHandler 类型,因此我们来到该方法:

@Override
public final ChannelPipeline addLast(ChannelHandler... handlers) {
    return addLast(null, handlers);
}

@Override
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
    ObjectUtil.checkNotNull(handlers, "handlers");

    for (ChannelHandler h: handlers) {
        if (h == null) {
            break;
        }
        addLast(executor, null, h);
    }

    return this;
}

最终会调用 带三个参数的方法:

@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
    final AbstractChannelHandlerContext newCtx;
    synchronized (this) {
        checkMultiplicity(handler);

        newCtx = newContext(group, filterName(name, handler), handler);

        addLast0(newCtx);

        // If the registered is false it means that the channel was not registered on an eventLoop yet.
        // In this case we add the context to the pipeline and add a task that will call
        // ChannelHandler.handlerAdded(...) once the channel is registered.
        if (!registered) {
            newCtx.setAddPending();
            callHandlerCallbackLater(newCtx, true);
            return this;
        }

        EventExecutor executor = newCtx.executor();
        if (!executor.inEventLoop()) {
            callHandlerAddedInEventLoop(newCtx, executor);
            return this;
        }
    }
    callHandlerAdded0(newCtx);
    return this;
}

private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
    return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}

首先是同步锁,然后 newCtx = newContext(group, filterName(name, handler), handler); ,可以看出 使用 newContext 方法 将我们传入的 Handler 打包成 Context,而该方法最终是构造了一个 DefaultChannelHandlerContex:

final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {

    private final ChannelHandler handler;

    DefaultChannelHandlerContext(
            DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
        super(pipeline, executor, name, handler.getClass());
        this.handler = handler;
    }

    @Override
    public ChannelHandler handler() {
        return handler;
    }
}

因此对于双向链表,其结点为 AbstractContext,而我们传入的 Handler 为其一个参数,但 HeadContext 与 TailContext 本身就实现了两个对象,其 handler 方法 返回 this 。