


Bootstrap 可以说是一切的开始,是一个工厂类,用来初始化 Netty 客户端或者服务端 。这里以 Netty 源码 example 中 echo 中的 代码入手来分析 客户端 的 Bootstrap 类 。


来到 example/src/main/java/io/netty/example/echo/,代码如下:

public final class EchoClient {

    static final boolean SSL = System.getProperty("ssl") != null;
    static final String HOST = System.getProperty("host", "");
    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
    static final int SIZE = Integer.parseInt(System.getProperty("size", "256"));

    public static void main(String[] args) throws Exception {
        // Configure SSL.git
        final SslContext sslCtx;
        if (SSL) {
            sslCtx = SslContextBuilder.forClient()
        } else {
            sslCtx = null;

        // Configure the client.
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
             .option(ChannelOption.TCP_NODELAY, true)
             .handler(new ChannelInitializer<SocketChannel>() {
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     if (sslCtx != null) {
                         p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
                     //p.addLast(new LoggingHandler(LogLevel.INFO));
                     p.addLast(new EchoClientHandler());

            // Start the client.
            ChannelFuture f = b.connect(HOST, PORT).sync();

            // Wait until the connection is closed.
        } finally {
            // Shut down the event loop to terminate all threads.

可以看到初始化的步骤:(这里暂时不分析 SSL)

  1. 实例化一个 Bootstrap
  2. 设置 group
  3. 设置 channel
  4. 设置相关配置 optional
  5. 设置 handler

而在 设置 handler 时,需要传入一个 Channel 初始化器,可以在其中获取到 channelPipeline 对象并在其中加入 Handler 。


先来看看 NIOSocketChannel


NioSocketChannel 继承于 SocketChannel,可以看出每当进行一个新的 Socket 链接,就会创建一个 NioSocketChannel 对象

其中 NioSocketChannel 为使用 NIO 实现的 TCP 连接的抽象,而 UDP 协议为 NioDatagramChannel 对象 。

NioSocketChannel 实例化

在 Bootstrap 中,我们调用了 channel(NioSocketChannel.class) 来指定需要实例化的类型,

public B channel(Class<? extends C> channelClass) {
    return channelFactory(new ReflectiveChannelFactory<C>(
        ObjectUtil.checkNotNull(channelClass, "channelClass")

最终会来到 channelFatory 方法

public B channelFactory(<? extends C> channelFactory) {
    return channelFactory((ChannelFactory<C>) channelFactory);

public B channelFactory(ChannelFactory<? extends C> channelFactory) {
    ObjectUtil.checkNotNull(channelFactory, "channelFactory");
    if (this.channelFactory != null) {
        throw new IllegalStateException("channelFactory set already");

    this.channelFactory = channelFactory;
    return self();

self 为 返回 this,只不过进行类型强转,可以看到这里主要是设置了一个 channelFatory 对象 。

实例化最终是在 connect 中实现的,链接之前会调用 initAndRegister 方法:

final ChannelFuture initAndRegister() {
    Channel channel = null;
    try {
        channel = channelFactory.newChannel();
    } catch (Throwable t) {
        if (channel != null) {
            // channel can be null if newChannel crashed (eg SocketException("too many open files"))
            // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
            return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
        // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
        return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);

    ChannelFuture regFuture = config().group().register(channel);
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
        } else {

    // If we are here and the promise is not failed, it's one of the following cases:
    // 1) If we attempted registration from the event loop, the registration has been completed at this point.
    //    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
    // 2) If we attempted registration from the other thread, the registration request has been successfully
    //    added to the event loop's task queue for later execution.
    //    i.e. It's safe to attempt bind() or connect() now:
    //         because bind() or connect() will be executed *after* the scheduled registration task is executed
    //         because register(), bind(), and connect() are all bound to the same thread.

    return regFuture;


hannel = channelFactory.newChannel();

从上面源码中可以看出,这里 channelFactory 的最终实现是 ReflectiveChannelFactory,来到相关方法:

public ReflectiveChannelFactory(Class<? extends T> clazz) {
    ObjectUtil.checkNotNull(clazz, "clazz");
    try {
        this.constructor = clazz.getConstructor();
    } catch (NoSuchMethodException e) {
        throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) +
                                           " does not have a public non-arg constructor", e);

public T newChannel() {
    try {
        return constructor.newInstance();
    } catch (Throwable t) {
        throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);

可以看到使用反射调用了 Channel 对象无参构造方法,因此 NioSocketChannel 实例化 在这里是使用 无参构造方法,来到该构造方法:

    public NioSocketChannel() {

     * Create a new instance using the given {@link SelectorProvider}.
    public NioSocketChannel(SelectorProvider provider) {

     * Create a new instance using the given {@link SocketChannel}.
    public NioSocketChannel(SocketChannel socket) {
        this(null, socket);

     * Create a new instance
     * @param parent    the {@link Channel} which created this instance or {@code null} if it was created by the user
     * @param socket    the {@link SocketChannel} which will be used
    public NioSocketChannel(Channel parent, SocketChannel socket) {
        super(parent, socket);
        config = new NioSocketChannelConfig(this, socket.socket());

一层一层套娃,来到 super(parent, socket);,调用了父类的构造方法,这里 parent 为 null,而 socket 为 SocketChannel 对象,这里调用了 newSocket 方法获取:

private static SocketChannel newSocket(SelectorProvider provider) {
    try {
             *  Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
             *  {@link SelectorProvider#provider()} which is called by each otherwise.
             *  See <a href="">#2308</a>.
        return provider.openSocketChannel();
    } catch (IOException e) {
        throw new ChannelException("Failed to open a socket.", e);


private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();

为常规 NIO 的获取方法,没有特别的 。

回到 super(parent, socket); 来到 AbstractNioByteChannel 的构造方法:

protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
    super(parent, ch, SelectionKey.OP_READ);

还是来到父类 AbstractNioChannel 构造方法:

protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent); = ch;
    this.readInterestOp = readInterestOp;
    try {
    } catch (IOException e) {
        try {
        } catch (IOException e2) {
                "Failed to close a partially initialized socket.", e2);

        throw new ChannelException("Failed to enter non-blocking mode.", e);

可以看到 这里调用了 ch.configureBlocking(false); 将 NIO 的 channel 设置为非阻塞模式

当然这里首先还是调用了父类的构造方法,来到 AbstractChannel 的构造方法:

    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();

将 parent 赋值给对应变量,然后调用 newUnsafe 方法与 newChannelPipeline 方法获取对象 。

当然对于刚刚启动的效果而言,这里的 parent 为 null 。

每个 Channel 都需要绑定 ChannelPipeline,就是在这绑定的 。

ChannelPipeline 将在之后进行分析 。


unsafe 为不安全的代码,一般而言是涉及底层的代码,比如 java 中支持 cas 操作的 sum.misc.Unsafe,而在 netty 中也实现了类似的操作 。

回到刚刚的代码,Channel 中,调用 newUnsafe 方法获取了一个 unsafe 对象,该方法是一个抽象方法,而对于刚刚执行的过程,其实现在 AbstractNioByteChannel 中:

    protected AbstractNioUnsafe newUnsafe() {
        return new NioByteUnsafe();

可以看到返回了一个 NioByteUnsafe 对象,来到该对象:


首先 Unsafe 为 netty 中封装了对底层 socket 的操作,从 Unsafe 接口的方法中也可以看出来:


ChannelPipeline 初始化

每个 Channel 都需要绑定一个 ChannelPipeline,ChannelPipeline 实际上维护了两个责任链,入站消息责任链和出站消息责任链(实际上在同个链表中),回到刚刚的代码,我们在 AbstractChannel 中看到了 ChannelPipeline 的初始化,来到该方法:

    protected DefaultChannelPipeline newChannelPipeline() {
        return new DefaultChannelPipeline(this);

可以看到最终实例化了一个 DefaultChannelPipeline 。



    protected DefaultChannelPipeline(Channel channel) { = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise =  new VoidChannelPromise(channel, true);

        tail = new TailContext(this);
        head = new HeadContext(this); = tail;
        tail.prev = head;

首先将我们传入的 NioSocketChannel 保存到 channel 变量中,此外,DefaultChannelPipeline 还维护了一个双向链表,分别用 tail 与 head 表示头尾节点 。


这里头节点实现了 出战消息处理器与入站消息处理器的的接口,同时要继承于 AbstractChannelHandlerContext 。

而尾节点类似,不过只实现了 入站消息处理器的接口:

    final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {

        TailContext(DefaultChannelPipeline pipeline) {
            super(pipeline, null, TAIL_NAME, TailContext.class);


回到最开始,我们在 Bootstrap 中实例化了 LoopGroup:

EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
// ……

我们直接调用 NioEventLoopGroup 的无参构造方法实例化该对象,这里先来看看该 NioEventLoopGroup



public NioEventLoopGroup() {

public NioEventLoopGroup(int nThreads) {
    this(nThreads, (Executor) null);

public NioEventLoopGroup(int nThreads, Executor executor) {
    this(nThreads, executor, SelectorProvider.provider());

public NioEventLoopGroup(
    int nThreads, Executor executor, final SelectorProvider selectorProvider) {
    this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);

    public NioEventLoopGroup(
        int nThreads, 
        Executor executor, 
        final SelectorProvider selectorProvider,
        final SelectStrategyFactory selectStrategyFactory) {
        super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());

最终会来到 super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());

该方法为 MultithreadEventLoopGroup 的构造方法:

protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
    super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);

注意到这里的 nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads,也就是当我们线程数指定为 0 的时候最终会设置为默认线程数,这里的默认线程数变量会在类加载的时候初始化:

    private static final int DEFAULT_EVENT_LOOP_THREADS;

    static {
        DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

        if (logger.isDebugEnabled()) {
            logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);

可以看到会从两个地方取值,一是配置文件中 io.netty.eventLoopThreads 的值,二是 CPU 核心数 *2,取这两者与1 三者的最大值 。

接着来到父类的构造方法,最终会来到 MultithreadEventExecutorGroup 的构造方法:

    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
        checkPositive(nThreads, "nThreads");

        if (executor == null) {
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());

        children = new EventExecutor[nThreads];

        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
                children[i] = newChild(executor, args);
                success = true;
            } catch (Exception e) {
                // TODO: Think about if this is a good exception type
                throw new IllegalStateException("failed to create a child event loop", e);
            } finally {
                if (!success) {
                    for (int j = 0; j < i; j ++) {

                    for (int j = 0; j < i; j ++) {
                        EventExecutor e = children[j];
                        try {
                            while (!e.isTerminated()) {
                                e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                        } catch (InterruptedException interrupted) {
                            // Let the caller handle the interruption.

        chooser = chooserFactory.newChooser(children);

        final FutureListener<Object> terminationListener = new FutureListener<Object>() {
            public void operationComplete(Future<Object> future) throws Exception {
                if (terminatedChildren.incrementAndGet() == children.length) {

        for (EventExecutor e: children) {

        Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
        Collections.addAll(childrenSet, children);
        readonlyChildren = Collections.unmodifiableSet(childrenSet);

其实核心就几个步骤,首先是根据线程数量初始化 children 数组:

children = new EventExecutor[nThreads];

然后在 for 循环中对这些 EventExecutor 调用 newChild 方法初始化 。

这里一个 EventExecutor 是一个线程池,派生于 EventExecutorGroup 而 EventExecutorGroup 又派生于 ScheduledExecutorService


回到刚刚 MultithreadEventExecutorGroup 中,可以看到之后还有设置 chooser 的代码:

chooser = chooserFactory.newChooser(children);

DefaultEventExecutorChooserFactory 中的 newChooser 方法

    public EventExecutorChooser newChooser(EventExecutor[] executors) {
        if (isPowerOfTwo(executors.length)) {
            return new PowerOfTwoEventExecutorChooser(executors);
        } else {
            return new GenericEventExecutorChooser(executors);

这里 Chooser 的功能都是一样的,当有需要的时候在 EventExecutor 数组中选择一个 EventExecutor 。

实际上 Netty 的 EventLoopGroup 实现机制就在这 MultithreadEventExecutorGroup 上,可以来看看 newChild 方法 这里是一个抽象方法,具体实现在 NioEventLoopGroup 中:

    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        SelectorProvider selectorProvider = (SelectorProvider) args[0];
        SelectStrategyFactory selectStrategyFactory = (SelectStrategyFactory) args[1];
        RejectedExecutionHandler rejectedExecutionHandler = (RejectedExecutionHandler) args[2];
        EventLoopTaskQueueFactory taskQueueFactory = null;
        EventLoopTaskQueueFactory tailTaskQueueFactory = null;

        int argsLength = args.length;
        if (argsLength > 3) {
            taskQueueFactory = (EventLoopTaskQueueFactory) args[3];
        if (argsLength > 4) {
            tailTaskQueueFactory = (EventLoopTaskQueueFactory) args[4];
        return new NioEventLoop(this, executor, selectorProvider,
                rejectedExecutionHandler, taskQueueFactory, tailTaskQueueFactory);

这里创建了一个新的 NioEventLoop 对象 。这是 Netty 较顶层的抽象,在底层,其实就是一个 EventExecutor 。在本例中,具体到这里会是一个线程池 。

来到 NioEventLoop :


主要看到 NioEventLoop 实际上派生于 SingleThreadEventLoop 对象,也就是单线程的线程池,来到其构造方法:

    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
                 EventLoopTaskQueueFactory taskQueueFactory, EventLoopTaskQueueFactory tailTaskQueueFactory) {
        super(parent, executor, false, newTaskQueue(taskQueueFactory), newTaskQueue(tailTaskQueueFactory),
        this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
        this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
        final SelectorTuple selectorTuple = openSelector();
        this.selector = selectorTuple.selector;
        this.unwrappedSelector = selectorTuple.unwrappedSelector;

来到 openSelector()

private SelectorTuple openSelector() {
        final Selector unwrappedSelector;
        try {
            unwrappedSelector = provider.openSelector();
        } catch (IOException e) {
            throw new ChannelException("failed to open a new selector", e);

            return new SelectorTuple(unwrappedSelector);

        Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
            public Object run() {
                try {
                    return Class.forName(
                } catch (Throwable cause) {
                    return cause;

        if (!(maybeSelectorImplClass instanceof Class) ||
            // ensure the current selector implementation is what we can instrument.
            !((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
            if (maybeSelectorImplClass instanceof Throwable) {
                Throwable t = (Throwable) maybeSelectorImplClass;
                logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
            return new SelectorTuple(unwrappedSelector);

        final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
        final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();

        Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
            public Object run() {
                try {
                    Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
                    Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");

                    if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
                        // Let us try to use sun.misc.Unsafe to replace the SelectionKeySet.
                        // This allows us to also do this in Java9+ without any extra flags.
                        long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
                        long publicSelectedKeysFieldOffset =

                        if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
                                    unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
                                    unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
                            return null;
                        // We could not retrieve the offset, lets try reflection as last-resort.

                    Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
                    if (cause != null) {
                        return cause;
                    cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
                    if (cause != null) {
                        return cause;

                    selectedKeysField.set(unwrappedSelector, selectedKeySet);
                    publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
                    return null;
                } catch (NoSuchFieldException e) {
                    return e;
                } catch (IllegalAccessException e) {
                    return e;

        if (maybeException instanceof Exception) {
            selectedKeys = null;
            Exception e = (Exception) maybeException;
            logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
            return new SelectorTuple(unwrappedSelector);
        selectedKeys = selectedKeySet;
        logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
        return new SelectorTuple(unwrappedSelector,
                                 new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));


unwrappedSelector = provider.openSelector();

之后都是对该 Selector 的设置操作 。

可以看出这里使用单线程模型配合 Nio 的 Selector 实现链接持有消息交互等操作 。

Channel 注册

回到 initAndRegister 方法:

final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            channel = channelFactory.newChannel();
        } catch (Throwable t) {
            if (channel != null) {
                // channel can be null if newChannel crashed (eg SocketException("too many open files"))
                // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
                return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
            // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
            return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);

        ChannelFuture regFuture = config().group().register(channel);
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
            } else {

        // If we are here and the promise is not failed, it's one of the following cases:
        // 1) If we attempted registration from the event loop, the registration has been completed at this point.
        //    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
        // 2) If we attempted registration from the other thread, the registration request has been successfully
        //    added to the event loop's task queue for later execution.
        //    i.e. It's safe to attempt bind() or connect() now:
        //         because bind() or connect() will be executed *after* the scheduled registration task is executed
        //         because register(), bind(), and connect() are all bound to the same thread.

        return regFuture;

当调用完 init 方法之后,会来到这里:

ChannelFuture regFuture = config().group().register(channel);

也就是注册 channel,会来到 MultithreadEventLoopGroup 的 register 方法:

    public ChannelFuture register(Channel channel) {
        return next().register(channel);

    public EventLoop next() {
        return (EventLoop);

这里 会来到 MultithreadEventExecutorGroup 方法:

    public EventExecutor next() {

也就是调用 chooser 的选择方法选择一个 EventExecutor,也就是 loop 。

因此最终这里会来到 NioEventLoop 的 register 方法,该类没有重写该方法,所以实际上调用的是 SingleThreadEventLoop 的方法:

    public ChannelFuture register(Channel channel) {
        return register(new DefaultChannelPromise(channel, this));

    public ChannelFuture register(final ChannelPromise promise) {
        ObjectUtil.checkNotNull(promise, "promise");, promise);
        return promise;

最终会来到 unsafe 对象中的 register 方法,最终调用的是 AbstractUnsafe 的 register 方法:

        public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            ObjectUtil.checkNotNull(eventLoop, "eventLoop");
            if (isRegistered()) {
                promise.setFailure(new IllegalStateException("registered to an event loop already"));
            if (!isCompatible(eventLoop)) {
                        new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));

            AbstractChannel.this.eventLoop = eventLoop;

            if (eventLoop.inEventLoop()) {
            } else {
                try {
                    eventLoop.execute(new Runnable() {
                        public void run() {
                } catch (Throwable t) {
                            "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                            AbstractChannel.this, t);
                    safeSetFailure(promise, t);


AbstractChannel.this.eventLoop = eventLoop;
            if (eventLoop.inEventLoop()) {
            } else {
                try {
                    eventLoop.execute(new Runnable() {
                        public void run() {

这里 isEventLoop 可以检测当前线程是否为该 eventLoop 中的线程,如果是就调用 register0() 否则 则在 eventLoop 线程池中调用 register0()

        private void register0(ChannelPromise promise) {
            try {
                // check if the channel is still open as it could be closed in the mean time when the register
                // call was outside of the eventLoop
                if (!promise.setUncancellable() || !ensureOpen(promise)) {
                boolean firstRegistration = neverRegistered;
                neverRegistered = false;
                registered = true;

                // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
                // user may already fire events through the pipeline in the ChannelFutureListener.

                // Only fire a channelActive if the channel has never been registered. This prevents firing
                // multiple channel actives if the channel is deregistered and re-registered.
                if (isActive()) {
                    if (firstRegistration) {
                    } else if (config().isAutoRead()) {
                        // This channel was registered before and autoRead() is set. This means we need to begin read
                        // again so that we process inbound data.
                        // See
            } catch (Throwable t) {
                // Close the channel directly to avoid FD leak.
                safeSetFailure(promise, t);

最终会来到 doRegister 方法,该方法在 AbstractNioChannel 中有重写:

    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
            } catch (CancelledKeyException e) {
                if (!selected) {
                    // Force the Selector to select now as the "canceled" SelectionKey may still be
                    // cached and not removed because no operation was called yet.
                    selected = true;
                } else {
                    // We forced a select operation on the selector before but the SelectionKey is still cached
                    // for whatever reason. JDK bug ?
                    throw e;

主要为调用 javaChannel().register 方法,

    protected SelectableChannel javaChannel() {
        return ch;

这 javaChannel 为返回 NioSocketChannel 对象 。

然后调用其 register 方法。

至此,这个 SocketChannel 注册到与 eventLoop 关联的 selector 上了

总的来说, Channel 注册过程所做的工作就是将 Channel 与对应的 EventLoop 关联, 因此这也体现了, 在 Netty 中, 每个 Channel 都会关联一个特定的 EventLoop, 并且这个 Channel 中的所有 IO 操作都是在这个 EventLoop 中执行的; 当关联好 Channel 和 EventLoop 后, 会继续调用底层的 Java NIO SocketChannel 的 register 方法, 将底层的 Java NIO SocketChannel 注册到指定的 selector 中. 通过这两步, 就完成了 Netty Channel 的注册过程.

Handler 注册

回到最初的起点,当我们要注册 Handler 的时候,我们需要传入一个 ChannelInitializer 对象,如下:

.handler(new ChannelInitializer<SocketChannel>() {
    public void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline p = ch.pipeline();
        if (sslCtx != null) {
            p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
        //p.addLast(new LoggingHandler(LogLevel.INFO));
        p.addLast(new EchoClientHandler());

来看看 handler 方法:

public B handler(ChannelHandler handler) {
    this.handler = ObjectUtil.checkNotNull(handler, "handler");
    return self();

其实就是设置变量而已,最终该变量会在 init 中调用:

    void init(Channel channel) {
        ChannelPipeline p = channel.pipeline();

        setChannelOptions(channel, newOptionsArray(), logger);
        setAttributes(channel, newAttributesArray());

这里将 handler 本身加到 pipeline 中,这里 config.handler 会返回 bootstrap 本身的handler 对象 。

来看看 ChannelInitializer 本身的代码:

public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {

    private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelInitializer.class);
    // We use a Set as a ChannelInitializer is usually shared between all Channels in a Bootstrap /
    // ServerBootstrap. This way we can reduce the memory usage compared to use Attributes.
    private final Set<ChannelHandlerContext> initMap = Collections.newSetFromMap(
            new ConcurrentHashMap<ChannelHandlerContext, Boolean>());

     * This method will be called once the {@link Channel} was registered. After the method returns this instance
     * will be removed from the {@link ChannelPipeline} of the {@link Channel}.
     * @param ch            the {@link Channel} which was registered.
     * @throws Exception    is thrown if an error occurs. In that case it will be handled by
     *                      {@link #exceptionCaught(ChannelHandlerContext, Throwable)} which will by default close
     *                      the {@link Channel}.
    protected abstract void initChannel(C ch) throws Exception;

    public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        // Normally this method will never be called as handlerAdded(...) should call initChannel(...) and remove
        // the handler.
        if (initChannel(ctx)) {
            // we called initChannel(...) so we need to call now pipeline.fireChannelRegistered() to ensure we not
            // miss an event.

            // We are done with init the Channel, removing all the state for the Channel now.
        } else {
            // Called initChannel(...) before which is the expected behavior, so just forward the event.

     * Handle the {@link Throwable} by logging and closing the {@link Channel}. Sub-classes may override this.
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if (logger.isWarnEnabled()) {
            logger.warn("Failed to initialize a channel. Closing: " +, cause);

     * {@inheritDoc} If override this method ensure you call super!
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        if ( {
            // This should always be true with our current DefaultChannelPipeline implementation.
            // The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering
            // surprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers
            // will be added in the expected order.
            if (initChannel(ctx)) {

                // We are done with init the Channel, removing the initializer now.

    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {

    private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
        if (initMap.add(ctx)) { // Guard against re-entrance.
            try {
            } catch (Throwable cause) {
                // Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).
                // We do so to prevent multiple calls to initChannel(...).
                exceptionCaught(ctx, cause);
            } finally {
                ChannelPipeline pipeline = ctx.pipeline();
                if (pipeline.context(this) != null) {
            return true;
        return false;

    private void removeState(final ChannelHandlerContext ctx) {
        // The removal may happen in an async fashion if the EventExecutor we use does something funky.
        if (ctx.isRemoved()) {
        } else {
            // The context is not removed yet which is most likely the case because a custom EventExecutor is used.
            // Let's schedule it on the EventExecutor to give it some more time to be completed in case it is offloaded.
            ctx.executor().execute(new Runnable() {
                public void run() {

重点在 channelRegistered 方法:

    public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        // Normally this method will never be called as handlerAdded(...) should call initChannel(...) and remove
        // the handler.
        if (initChannel(ctx)) {
            // we called initChannel(...) so we need to call now pipeline.fireChannelRegistered() to ensure we not
            // miss an event.

            // We are done with init the Channel, removing all the state for the Channel now.
        } else {
            // Called initChannel(...) before which is the expected behavior, so just forward the event.

会调用 initChannel(ChannelHandlerContext) 方法间接调用我们重写的 initChannel 方法,然后将自己从 ChannelPipeline 中移除(具体将在之后分析)

总结一下,对于 Handler 的注册,我们是采用传入一个 ChannelInitializer 来实现的,而 ChannelInitializer 继承于 ChannelInboundHandlerAdapter,本身就是一个 Handler,首先作为普通 Handler 存放在 ChannelPipeline 中,而在注册后,会调用我们重写的抽象方法,我们可以在抽象方法中写入自己的 Handler,而注册后,ChannelInitializer 会将自己从 ChannelPipeline 中移除 。



回到 connect 方法的的调用,最终会来到 doResolveAndConnect 方法:

    private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel =;

        if (regFuture.isDone()) {
            if (!regFuture.isSuccess()) {
                return regFuture;
            return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
        } else {
            // Registration future is almost always fulfilled already, but just in case it's not.
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                public void operationComplete(ChannelFuture future) throws Exception {
                    // Directly obtain the cause and do a null check so we only need one volatile read in case of a
                    // failure.
                    Throwable cause = future.cause();
                    if (cause != null) {
                        // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                        // IllegalStateException once we try to access the EventLoop of the Channel.
                    } else {
                        // Registration was successful, so set the correct executor to use.
                        // See
                        doResolveAndConnect0(channel, remoteAddress, localAddress, promise);
            return promise;

initAndRegister 方法分析完了,接下来来到 doResolveAndConnect0 方法:

    private ChannelFuture doResolveAndConnect0(final Channel channel, SocketAddress remoteAddress,
                                               final SocketAddress localAddress, final ChannelPromise promise) {
        try {
            final EventLoop eventLoop = channel.eventLoop();
            AddressResolver<SocketAddress> resolver;
            try {
                resolver = this.resolver.getResolver(eventLoop);
            } catch (Throwable cause) {
                return promise.setFailure(cause);

            if (!resolver.isSupported(remoteAddress) || resolver.isResolved(remoteAddress)) {
                // Resolver has no idea about what to do with the specified remote address or it's resolved already.
                doConnect(remoteAddress, localAddress, promise);
                return promise;

            final Future<SocketAddress> resolveFuture = resolver.resolve(remoteAddress);

            if (resolveFuture.isDone()) {
                final Throwable resolveFailureCause = resolveFuture.cause();

                if (resolveFailureCause != null) {
                    // Failed to resolve immediately
                } else {
                    // Succeeded to resolve immediately; cached? (or did a blocking lookup)
                    doConnect(resolveFuture.getNow(), localAddress, promise);
                return promise;

            // Wait until the name resolution is finished.
            resolveFuture.addListener(new FutureListener<SocketAddress>() {
                public void operationComplete(Future<SocketAddress> future) throws Exception {
                    if (future.cause() != null) {
                    } else {
                        doConnect(future.getNow(), localAddress, promise);
        } catch (Throwable cause) {
        return promise;

最终会来到 doConnect 方法:

    private static void doConnect(
            final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {

        // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
        // the pipeline in its channelRegistered() implementation.
        final Channel channel =;
        channel.eventLoop().execute(new Runnable() {
            public void run() {
                if (localAddress == null) {
                    channel.connect(remoteAddress, connectPromise);
                } else {
                    channel.connect(remoteAddress, localAddress, connectPromise);

在 loop 中添加任务,调用 channel.connect 方法,而这里 channel 为 NioSocketChannel,因此来到 NioSocektChannel#connect 方法,实际上该类没有重写该方法,最终调用的是 AbstractChannel 中的该方法:

    public ChannelFuture connect(SocketAddress remoteAddress) {
        return pipeline.connect(remoteAddress);

可以看到来到 pipeline 的 connect 方法,而该过程中 pipeline 对象为 DefaultChannelPipeline#connect:

    public final ChannelFuture connect(SocketAddress remoteAddress) {
        return tail.connect(remoteAddress);

这里调用链表中表尾的节点的 connect 方法,这里 tail 为 TailContext,没有重写该方法,因此实际上调用的是 AbstractChannelHandlerContext 的 connect 方法:

    public ChannelFuture connect(SocketAddress remoteAddress) {
        return connect(remoteAddress, newPromise());

    public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
        return connect(remoteAddress, null, promise);
    public ChannelFuture connect(
            final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
        ObjectUtil.checkNotNull(remoteAddress, "remoteAddress");

        if (isNotValidPromise(promise, false)) {
            // cancelled
            return promise;

        final AbstractChannelHandlerContext next = findContextOutbound(MASK_CONNECT);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeConnect(remoteAddress, localAddress, promise);
        } else {
            safeExecute(executor, new Runnable() {
                public void run() {
                    next.invokeConnect(remoteAddress, localAddress, promise);
            }, promise, null, false);
        return promise;

首先调用 findContextOutbound(MASK_CONNECT) 方法找到一个 Handler:

    private AbstractChannelHandlerContext findContextOutbound(int mask) {
        AbstractChannelHandlerContext ctx = this;
        EventExecutor currentExecutor = executor();
        do {
            ctx = ctx.prev;
        } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_OUTBOUND));
        return ctx;

可以看到,这里从该节点不断往前找有对应 mask 的 MASK_CONNECT 的 handler 。

回到头节点 HeadContext ,这里 mask 的生成规则比较复杂,是根据节点类型自动生成的,在 ChannelHandlerMask 中生成,这里给出关键代码:


有两个标记,他们都带有 MASK_CONNECT

if (ChannelOutboundHandler.class.isAssignableFrom(handlerType)) {
    mask |= MASK_ALL_OUTBOUND;

    if (isSkippable(handlerType, "bind", ChannelHandlerContext.class,
                    SocketAddress.class, ChannelPromise.class)) {
        mask &= ~MASK_BIND;
    if (isSkippable(handlerType, "connect", ChannelHandlerContext.class, SocketAddress.class,
                    SocketAddress.class, ChannelPromise.class)) {
        mask &= ~MASK_CONNECT;
    if (isSkippable(handlerType, "disconnect", ChannelHandlerContext.class, ChannelPromise.class)) {
        mask &= ~MASK_DISCONNECT;
    if (isSkippable(handlerType, "close", ChannelHandlerContext.class, ChannelPromise.class)) {
        mask &= ~MASK_CLOSE;
    if (isSkippable(handlerType, "deregister", ChannelHandlerContext.class, ChannelPromise.class)) {
        mask &= ~MASK_DEREGISTER;
    if (isSkippable(handlerType, "read", ChannelHandlerContext.class)) {
        mask &= ~MASK_READ;
    if (isSkippable(handlerType, "write", ChannelHandlerContext.class,
                    Object.class, ChannelPromise.class)) {
        mask &= ~MASK_WRITE;
    if (isSkippable(handlerType, "flush", ChannelHandlerContext.class)) {
        mask &= ~MASK_FLUSH;

这里是根据有无对应方法重写来将标记去掉,可以看到如果没有 connect 方法的话,就会将 MASK_CONNECT 标记去掉,而刚刚启动流程中,HeadContext 一定带有 connect 方法 。因此从尾节点往前找,最终会来到头节点,并调用 HeadContext#connect 方法:

        public void connect(
                ChannelHandlerContext ctx,
                SocketAddress remoteAddress, SocketAddress localAddress,
                ChannelPromise promise) {
            unsafe.connect(remoteAddress, localAddress, promise);

这里 unsafe 是 NioByteUnsafe,实际上该类没有重写方法,所以实际上来到 AbstractNioUnsafe

public final void connect(
    final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
    if (!promise.setUncancellable() || !ensureOpen(promise)) {

    try {
        if (connectPromise != null) {
            // Already a connect in process.
            throw new ConnectionPendingException();

        boolean wasActive = isActive();
        if (doConnect(remoteAddress, localAddress)) {
            fulfillConnectPromise(promise, wasActive);
        } else {
            connectPromise = promise;
            requestedRemoteAddress = remoteAddress;

            // Schedule connect timeout.
            int connectTimeoutMillis = config().getConnectTimeoutMillis();
            if (connectTimeoutMillis > 0) {
                connectTimeoutFuture = eventLoop().schedule(new Runnable() {
                    public void run() {
                        ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
                        if (connectPromise != null && !connectPromise.isDone()
                            && connectPromise.tryFailure(new ConnectTimeoutException(
                                "connection timed out: " + remoteAddress))) {
                }, connectTimeoutMillis, TimeUnit.MILLISECONDS);

            promise.addListener(new ChannelFutureListener() {
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (future.isCancelled()) {
                        if (connectTimeoutFuture != null) {
                        connectPromise = null;
    } catch (Throwable t) {
        promise.tryFailure(annotateConnectException(t, remoteAddress));

来到 doConnect,注意 这里 AbstractNioUnsafe 是 AbstractNioChannel 的内部类,而该方法是 AbstractNioChannel 中抽象方法 。最终来到的是 NioSocketChannel#connect

    protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
        if (localAddress != null) {

        boolean success = false;
        try {
            boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
            if (!connected) {
            success = true;
            return connected;
        } finally {
            if (!success) {

这里 bind 和 connect 最终会调用 NIO 的绑定和连接方法,此外 doBind 还根据 java 版本进行不同的方法调用:

    protected void doBind(SocketAddress localAddress) throws Exception {

    private void doBind0(SocketAddress localAddress) throws Exception {
        if (PlatformDependent.javaVersion() >= 7) {
            SocketUtils.bind(javaChannel(), localAddress);
        } else {
            SocketUtils.bind(javaChannel().socket(), localAddress);

至此,Bootstrap Client 启动流程就分析完毕 。