Bootstrap Server.md

何言 2021年08月11日 61次浏览

ServerBootstrap

ServerBootstrap 是一个工厂类,用来初始化 Netty 服务端 。这里以 Netty 源码 example 中 echo 中的 代码入手来分析 服务端 的 ServerBootstrap 类 。

源码

来到 example/src/main/java/io/netty/example/echo/EchoServer.java,代码如下:

public class Main {

    static final boolean SSL = System.getProperty("ssl") != null;
    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));

    public static void main(String[] args) throws Exception {
        // Configure SSL.
        final SslContext sslCtx;
        if (SSL) {
            SelfSignedCertificate ssc = new SelfSignedCertificate();
            sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
        } else {
            sslCtx = null;
        }

        // Configure the server.
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        final EchoServerHandler serverHandler = new EchoServerHandler();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 100)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline p = ch.pipeline();
                            if (sslCtx != null) {
                                p.addLast(sslCtx.newHandler(ch.alloc()));
                            }
                            //p.addLast(new LoggingHandler(LogLevel.INFO));
                            p.addLast(serverHandler);
                        }
                    });

            // Start the server.
            ChannelFuture f = b.bind(PORT).sync();

            // Wait until the server socket is closed.
            f.channel().closeFuture().sync();
        } finally {
            // Shut down all event loops to terminate all threads.
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

可以看到初始化的步骤:(这里暂时不分析 SSL)

  1. 实例化一个 Bootstrap
  2. 设置两个 group,一个 bossGroup,一个 workerGroup
  3. 设置 channel
  4. 设置相关配置 optional
  5. 设置 handler
  6. 设置 childHandler

而在 设置 childHandler 时,需要传入一个 Channel 初始化器,可以在其中获取到 channelPipeline 对象并在其中加入 Handler 。

EventLoopGroup

由客户端的分析可知,一个 EventLoop 继承于于 SingleThreadEventExector,暂时可以认为和 JUC 中 SingleThreadExector 差不多 。

而 EventLoopGroup 相当于一个 EventLoop 池,每当有任务事件时,对应 EventLoopGroup 会从其池里选一个 EventLoop 使用 。

而对于服务器,其实际上需要维护两个 EventLoop 池,其中 BossGroup 用于接收连接,相当于 SocketServer 中 accpet 任务,当接收到一个链接后,会将连接交给 WorkerGroup ,而 WorkerGroup 会从其池子中选出一个 EventLoop 用于维护该链接 。

而对于 BossGroup ,实际上只需要一个 EventLoop 即可完成,因此这里调用 EventLoopGroup bossGroup = new NioEventLoopGroup(1); 生成一个只有一个 EventLoop 的 NioEventLoopGroup 。

而 WorkerGroup 则按照默认值,也就是 配置文件中的值,Cpu 核心数*2,1 三者取最大值 。

Handler

这里 Handler 也有两个,一个是 chiildChild,一个是 handler 。其中 childChild 是用于处理与客户端的消息交互,child 是用于 bossGroup 中处理客户端的连接请求 。

NioServerSocketChannel 实例化

image20210809151337640.png

与 客户端 的类似,我们使用 channel 方法指定使用的 channel 为 NioServerSocketChannel:

    public B channel(Class<? extends C> channelClass) {
        return channelFactory(new ReflectiveChannelFactory<C>(
                ObjectUtil.checkNotNull(channelClass, "channelClass")
        ));
    }

这里使用的也是 ReflectiveChannelFactory,会因此会调用其无参构造方法:

public NioServerSocketChannel() {
    this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}

/**
     * Create a new instance using the given {@link SelectorProvider}.
     */
public NioServerSocketChannel(SelectorProvider provider) {
    this(newSocket(provider));
}

/**
     * Create a new instance using the given {@link ServerSocketChannel}.
     */
public NioServerSocketChannel(ServerSocketChannel channel) {
    super(null, channel, SelectionKey.OP_ACCEPT);
    config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}

可以看到使用 newSocket 方法获取了一个 ServerSocketChannel,并套娃后最终传到父类构造方法:

private static ServerSocketChannel newSocket(SelectorProvider provider) {
    try {
        /**
             *  Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
             *  {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise.
             *
             *  See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.
             */
        return provider.openServerSocketChannel();
    } catch (IOException e) {
        throw new ChannelException(
            "Failed to open a server socket.", e);
    }
}

可以看到使用了 NIO 的方法获取到了一个 Channel 。

回到刚刚的构造方法,最终会使用 super 来到 AbstractNioMessageChannel 的构造方法:

protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent, ch, readInterestOp);
}

继续来到 AbstractNioChannel,这里和客户端的就是一样了:

protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent);
    this.ch = ch;
    this.readInterestOp = readInterestOp;
    try {
        ch.configureBlocking(false);
    } catch (IOException e) {
        try {
            ch.close();
        } catch (IOException e2) {
            logger.warn(
                "Failed to close a partially initialized socket.", e2);
        }

        throw new ChannelException("Failed to enter non-blocking mode.", e);
    }
}

最终来到 AbstractChannel

    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
    }

和客户端一样,这里 newChannelPipeline 还是 DefaultChannelPipeline

不过 newUnsafe 方法在 AbstractNioMessageChannel 中重写了:

   @Override
    protected AbstractNioUnsafe newUnsafe() {
        return new NioMessageUnsafe();
    }

也就是说对于服务器,其 unsafe 对象为 NioMessageUnsafe

image20210809152214225.png

此外关于 Channle 注册与 ChannelPipeline 初始化与客户端的过程一致 。

EventLoopGroup 注册

来到最初的方法,我们使用 group 方法指定了两个 LoopGroup

EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
    ServerBootstrap b = new ServerBootstrap();
    b.group(bossGroup, workerGroup)

来看看这个 group 方法:

public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
    super.group(parentGroup);
    if (this.childGroup != null) {
        throw new IllegalStateException("childGroup set already");
    }
    this.childGroup = ObjectUtil.checkNotNull(childGroup, "childGroup");
    return this;
}

可以看到 我们的 bossGroup 实际上是 parentGroup,workerGroup 为 childGroup ,而 parentGroup 为调用父类的构造方法传入,childGroup 直接设置为一个变量 。

这里的 super 为 AbstractBootstrap,该类也是客户端启动类的父类,因此效果一致也就是说对于 服务端来说,其 parentGroup 的注册过程和客户端一致,但 childGroup 是如何注册的呢? 我们来到 bind 方法,该方法会监听一个端口也就是最终服务的启动的方法:

经过周转最终会来到 AbstractBootstrap 中的 doBind 方法,如下:

private ChannelFuture doBind(final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }

        if (regFuture.isDone()) {
            // At this point we know that the registration was complete and successful.
            ChannelPromise promise = channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
            // Registration future is almost always fulfilled already, but just in case it's not.
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if (cause != null) {
                        // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                        // IllegalStateException once we try to access the EventLoop of the Channel.
                        promise.setFailure(cause);
                    } else {
                        // Registration was successful, so set the correct executor to use.
                        // See https://github.com/netty/netty/issues/2586
                        promise.registered();

                        doBind0(regFuture, channel, localAddress, promise);
                    }
                }
            });
            return promise;
        }
    }

最终还是来到老朋友方法 initAndRegister:

final ChannelFuture initAndRegister() {
    Channel channel = null;
    try {
        channel = channelFactory.newChannel();
        init(channel);
    } catch (Throwable t) {
        if (channel != null) {
            // channel can be null if newChannel crashed (eg SocketException("too many open files"))
            channel.unsafe().closeForcibly();
            // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
            return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
        }
        // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
        return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
    }

    ChannelFuture regFuture = config().group().register(channel);
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }

    // If we are here and the promise is not failed, it's one of the following cases:
    // 1) If we attempted registration from the event loop, the registration has been completed at this point.
    //    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
    // 2) If we attempted registration from the other thread, the registration request has been successfully
    //    added to the event loop's task queue for later execution.
    //    i.e. It's safe to attempt bind() or connect() now:
    //         because bind() or connect() will be executed *after* the scheduled registration task is executed
    //         because register(), bind(), and connect() are all bound to the same thread.

    return regFuture;
}

newChannel 方法与客户端一致,主要区别在于 init,在 AbstractBootstrap 中该类是抽象方法,而 ServerrBootstrap 重写了该方法:

@Override
void init(Channel channel) {
    setChannelOptions(channel, newOptionsArray(), logger);
    setAttributes(channel, newAttributesArray());

    ChannelPipeline p = channel.pipeline();

    final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;
    final Entry<ChannelOption<?>, Object>[] currentChildOptions = newOptionsArray(childOptions);
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs = newAttributesArray(childAttrs);

    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(final Channel ch) {
            final ChannelPipeline pipeline = ch.pipeline();
            ChannelHandler handler = config.handler();
            if (handler != null) {
                pipeline.addLast(handler);
            }

            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    pipeline.addLast(new ServerBootstrapAcceptor(
                        ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });
}

可以看到这里给 channel 添加了一个ChannelInitializer,其中为 ChannelPipeline 添加了一个 ServerBootstrapAcceptor,这个 ServerBootstrapAcceptor 中传入了 childGroup,也就是说 childGroup 会加入到 pipeline 中一个 handler,在服务的,目前的消息责任链处理的是客户的连接请求,因此在 handler 中将该请求与 childGroup 中某个 EventLoop 绑定。

来看看这个 ServerBootstrapAcceptor :

image20210809155408121.png

是一个进站消息处理器,主要看其 channelRead 方法:

@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    final Channel child = (Channel) msg;

    child.pipeline().addLast(childHandler);

    setChannelOptions(child, childOptions, logger);
    setAttributes(child, childAttrs);

    try {
        childGroup.register(child).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    forceClose(child, future.cause());
                }
            }
        });
    } catch (Throwable t) {
        forceClose(child, t);
    }
}

将 child handler 添加到 消息的 channel 中,同时调用 childGroup.register 将 channel 注册到 childGroup 中。

这里的 childHandler 就是我们一开始使用 childHandler 方法传入的 ChannelInitializer 。

至此我们可以得出结论了:

  1. 首先会有一个 服务接收 channel ,在 ParentGroup 中注册 。在通过 accept 接收到客户端的请求后,会生成一个新的 channel 作为消息在 接收 channelPipeline 中传输 。
  2. channelPipeline 中有一个 ServerBootstrapAcceptor 的 入站消息 handler,持有 childGroup 对象与 childHandler 的 ChannelInitializer 对象。
  3. 在收到 channel 消息后,将 channel 注册到 childGroupo 中,并将 ChannelInitializer 加入 pipeline,ChannelInitializer 最终会执行我们重写的抽象方法 initChannel 后移除 。之后收到客户的的消息是在 childGroup 中的 channel 中传播 。

image20210809163632541.png

image20210809163638843.png

Bind 过程

设置后,我们调用 bind 方法监听一个端口,其会调用 AbstractBootstrap#onBind 方法,回到该方法:

private ChannelFuture doBind(final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }

        if (regFuture.isDone()) {
            // At this point we know that the registration was complete and successful.
            ChannelPromise promise = channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
            // Registration future is almost always fulfilled already, but just in case it's not.
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if (cause != null) {
                        // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                        // IllegalStateException once we try to access the EventLoop of the Channel.
                        promise.setFailure(cause);
                    } else {
                        // Registration was successful, so set the correct executor to use.
                        // See https://github.com/netty/netty/issues/2586
                        promise.registered();

                        doBind0(regFuture, channel, localAddress, promise);
                    }
                }
            });
            return promise;
        }
    }

在 initAndRegister 后,调用了一下 doBind0 方法,来看看:

private static void doBind0(
    final ChannelFuture regFuture, final Channel channel,
    final SocketAddress localAddress, final ChannelPromise promise) {

    // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
    // the pipeline in its channelRegistered() implementation.
    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (regFuture.isSuccess()) {
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }
        }
    });
}

来到 channel.bind,这里的 cannel 依然是 NioServerSocketChannel,不过最终调用的是 AbstractCannel中的方法:

    @Override
    public ChannelFuture bind(SocketAddress localAddress) {
        return pipeline.bind(localAddress);
    }

来到 pipeline.bind,这里是 DefaultChannelPipeline#bind :

    @Override
    public final ChannelFuture bind(SocketAddress localAddress) {
        return tail.bind(localAddress);
    }

来到 TailContext.bind。实际上是 AbstractChannelHandlerContext.bind

@Override
public ChannelFuture bind(SocketAddress localAddress) {
    return bind(localAddress, newPromise());
}    

@Override
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
    ObjectUtil.checkNotNull(localAddress, "localAddress");
    if (isNotValidPromise(promise, false)) {
        // cancelled
        return promise;
    }

    final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeBind(localAddress, promise);
    } else {
        safeExecute(executor, new Runnable() {
            @Override
            public void run() {
                next.invokeBind(localAddress, promise);
            }
        }, promise, null, false);
    }
    return promise;
}

往前找到带 MASK_BIND,也就是带 bind 方法的 节点,调用其 invokeBind 方法:

往前找最终会来到头节点,也就是 HeadContext,不过最终调用的是 AbstractChannelHandlerContext 的 invokeBind 方法:

    private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
        if (invokeHandler()) {
            try {
                ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
            } catch (Throwable t) {
                notifyOutboundHandlerException(t, promise);
            }
        } else {
            bind(localAddress, promise);
        }
    }

这里 handler() 方法在 HeadContext 中是返回 this 。

来到 HeadContext.bind:

@Override
public void bind(
    ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
    unsafe.bind(localAddress, promise);
}

这里 unsafe 为 NioMessageUnsafe,最终来到 AbstractUnsafe 的 bind :

@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
    assertEventLoop();

    if (!promise.setUncancellable() || !ensureOpen(promise)) {
        return;
    }

    // See: https://github.com/netty/netty/issues/576
    if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
        localAddress instanceof InetSocketAddress &&
        !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
        !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
        // Warn a user about the fact that a non-root user can't receive a
        // broadcast packet on *nix if the socket is bound on non-wildcard address.
        logger.warn(
            "A non-root user can't receive a broadcast packet if the socket " +
            "is not bound to a wildcard address; binding to a non-wildcard " +
            "address (" + localAddress + ") anyway as requested.");
    }

    boolean wasActive = isActive();
    try {
        doBind(localAddress);
    } catch (Throwable t) {
        safeSetFailure(promise, t);
        closeIfClosed();
        return;
    }

    if (!wasActive && isActive()) {
        invokeLater(new Runnable() {
            @Override
            public void run() {
                pipeline.fireChannelActive();
            }
        });
    }

    safeSetSuccess(promise);
}

会来到外部类的 doBind, 最终回到 NioServerSocketChannel 的 doBind 方法:

@SuppressJava6Requirement(reason = "Usage guarded by java version check")
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
    if (PlatformDependent.javaVersion() >= 7) {
        javaChannel().bind(localAddress, config.getBacklog());
    } else {
        javaChannel().socket().bind(localAddress, config.getBacklog());
    }
}