Bootstrap Server.md
ServerBootstrap
ServerBootstrap 是一个工厂类,用来初始化 Netty 服务端 。这里以 Netty 源码 example 中 echo 中的 代码入手来分析 服务端 的 ServerBootstrap 类 。
源码
来到 example/src/main/java/io/netty/example/echo/EchoServer.java
,代码如下:
public class Main {
static final boolean SSL = System.getProperty("ssl") != null;
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
public static void main(String[] args) throws Exception {
// Configure SSL.
final SslContext sslCtx;
if (SSL) {
SelfSignedCertificate ssc = new SelfSignedCertificate();
sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
} else {
sslCtx = null;
}
// Configure the server.
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
final EchoServerHandler serverHandler = new EchoServerHandler();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});
// Start the server.
ChannelFuture f = b.bind(PORT).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
可以看到初始化的步骤:(这里暂时不分析 SSL)
- 实例化一个 Bootstrap
- 设置两个 group,一个 bossGroup,一个 workerGroup
- 设置 channel
- 设置相关配置 optional
- 设置 handler
- 设置 childHandler
而在 设置 childHandler 时,需要传入一个 Channel 初始化器,可以在其中获取到 channelPipeline 对象并在其中加入 Handler 。
EventLoopGroup
由客户端的分析可知,一个 EventLoop 继承于于 SingleThreadEventExector,暂时可以认为和 JUC 中 SingleThreadExector 差不多 。
而 EventLoopGroup 相当于一个 EventLoop 池,每当有任务事件时,对应 EventLoopGroup 会从其池里选一个 EventLoop 使用 。
而对于服务器,其实际上需要维护两个 EventLoop 池,其中 BossGroup 用于接收连接,相当于 SocketServer 中 accpet 任务,当接收到一个链接后,会将连接交给 WorkerGroup ,而 WorkerGroup 会从其池子中选出一个 EventLoop 用于维护该链接 。
而对于 BossGroup ,实际上只需要一个 EventLoop 即可完成,因此这里调用 EventLoopGroup bossGroup = new NioEventLoopGroup(1);
生成一个只有一个 EventLoop 的 NioEventLoopGroup 。
而 WorkerGroup 则按照默认值,也就是 配置文件中的值,Cpu 核心数*2,1 三者取最大值 。
Handler
这里 Handler 也有两个,一个是 chiildChild,一个是 handler 。其中 childChild 是用于处理与客户端的消息交互,child 是用于 bossGroup 中处理客户端的连接请求 。
NioServerSocketChannel 实例化
与 客户端 的类似,我们使用 channel 方法指定使用的 channel 为 NioServerSocketChannel:
public B channel(Class<? extends C> channelClass) {
return channelFactory(new ReflectiveChannelFactory<C>(
ObjectUtil.checkNotNull(channelClass, "channelClass")
));
}
这里使用的也是 ReflectiveChannelFactory,会因此会调用其无参构造方法:
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
/**
* Create a new instance using the given {@link SelectorProvider}.
*/
public NioServerSocketChannel(SelectorProvider provider) {
this(newSocket(provider));
}
/**
* Create a new instance using the given {@link ServerSocketChannel}.
*/
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
可以看到使用 newSocket 方法获取了一个 ServerSocketChannel,并套娃后最终传到父类构造方法:
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
/**
* Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
* {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise.
*
* See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.
*/
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e);
}
}
可以看到使用了 NIO 的方法获取到了一个 Channel 。
回到刚刚的构造方法,最终会使用 super 来到 AbstractNioMessageChannel 的构造方法:
protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent, ch, readInterestOp);
}
继续来到 AbstractNioChannel,这里和客户端的就是一样了:
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
ch.configureBlocking(false);
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
最终来到 AbstractChannel
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
和客户端一样,这里 newChannelPipeline 还是 DefaultChannelPipeline
不过 newUnsafe 方法在 AbstractNioMessageChannel 中重写了:
@Override
protected AbstractNioUnsafe newUnsafe() {
return new NioMessageUnsafe();
}
也就是说对于服务器,其 unsafe 对象为 NioMessageUnsafe
此外关于 Channle 注册与 ChannelPipeline 初始化与客户端的过程一致 。
EventLoopGroup 注册
来到最初的方法,我们使用 group 方法指定了两个 LoopGroup
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
来看看这个 group 方法:
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
}
this.childGroup = ObjectUtil.checkNotNull(childGroup, "childGroup");
return this;
}
可以看到 我们的 bossGroup 实际上是 parentGroup,workerGroup 为 childGroup ,而 parentGroup 为调用父类的构造方法传入,childGroup 直接设置为一个变量 。
这里的 super 为 AbstractBootstrap,该类也是客户端启动类的父类,因此效果一致也就是说对于 服务端来说,其 parentGroup 的注册过程和客户端一致,但 childGroup 是如何注册的呢? 我们来到 bind 方法,该方法会监听一个端口也就是最终服务的启动的方法:
经过周转最终会来到 AbstractBootstrap 中的 doBind 方法,如下:
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
最终还是来到老朋友方法 initAndRegister:
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
// If we are here and the promise is not failed, it's one of the following cases:
// 1) If we attempted registration from the event loop, the registration has been completed at this point.
// i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
// 2) If we attempted registration from the other thread, the registration request has been successfully
// added to the event loop's task queue for later execution.
// i.e. It's safe to attempt bind() or connect() now:
// because bind() or connect() will be executed *after* the scheduled registration task is executed
// because register(), bind(), and connect() are all bound to the same thread.
return regFuture;
}
newChannel 方法与客户端一致,主要区别在于 init,在 AbstractBootstrap 中该类是抽象方法,而 ServerrBootstrap 重写了该方法:
@Override
void init(Channel channel) {
setChannelOptions(channel, newOptionsArray(), logger);
setAttributes(channel, newAttributesArray());
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions = newOptionsArray(childOptions);
final Entry<AttributeKey<?>, Object>[] currentChildAttrs = newAttributesArray(childAttrs);
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
可以看到这里给 channel 添加了一个ChannelInitializer,其中为 ChannelPipeline 添加了一个 ServerBootstrapAcceptor,这个 ServerBootstrapAcceptor 中传入了 childGroup,也就是说 childGroup 会加入到 pipeline 中一个 handler,在服务的,目前的消息责任链处理的是客户的连接请求,因此在 handler 中将该请求与 childGroup 中某个 EventLoop 绑定。
来看看这个 ServerBootstrapAcceptor :
是一个进站消息处理器,主要看其 channelRead
方法:
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
setChannelOptions(child, childOptions, logger);
setAttributes(child, childAttrs);
try {
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
将 child handler 添加到 消息的 channel 中,同时调用 childGroup.register 将 channel 注册到 childGroup 中。
这里的 childHandler 就是我们一开始使用 childHandler 方法传入的 ChannelInitializer 。
至此我们可以得出结论了:
- 首先会有一个 服务接收 channel ,在 ParentGroup 中注册 。在通过 accept 接收到客户端的请求后,会生成一个新的 channel 作为消息在 接收 channelPipeline 中传输 。
- channelPipeline 中有一个 ServerBootstrapAcceptor 的 入站消息 handler,持有 childGroup 对象与 childHandler 的 ChannelInitializer 对象。
- 在收到 channel 消息后,将 channel 注册到 childGroupo 中,并将 ChannelInitializer 加入 pipeline,ChannelInitializer 最终会执行我们重写的抽象方法 initChannel 后移除 。之后收到客户的的消息是在 childGroup 中的 channel 中传播 。
Bind 过程
设置后,我们调用 bind 方法监听一个端口,其会调用 AbstractBootstrap#onBind 方法,回到该方法:
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
在 initAndRegister 后,调用了一下 doBind0 方法,来看看:
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
来到 channel.bind,这里的 cannel 依然是 NioServerSocketChannel
,不过最终调用的是 AbstractCannel中的方法:
@Override
public ChannelFuture bind(SocketAddress localAddress) {
return pipeline.bind(localAddress);
}
来到 pipeline.bind,这里是 DefaultChannelPipeline#bind :
@Override
public final ChannelFuture bind(SocketAddress localAddress) {
return tail.bind(localAddress);
}
来到 TailContext.bind。实际上是 AbstractChannelHandlerContext.bind
@Override
public ChannelFuture bind(SocketAddress localAddress) {
return bind(localAddress, newPromise());
}
@Override
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
ObjectUtil.checkNotNull(localAddress, "localAddress");
if (isNotValidPromise(promise, false)) {
// cancelled
return promise;
}
final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeBind(localAddress, promise);
} else {
safeExecute(executor, new Runnable() {
@Override
public void run() {
next.invokeBind(localAddress, promise);
}
}, promise, null, false);
}
return promise;
}
往前找到带 MASK_BIND,也就是带 bind 方法的 节点,调用其 invokeBind 方法:
往前找最终会来到头节点,也就是 HeadContext,不过最终调用的是 AbstractChannelHandlerContext 的 invokeBind 方法:
private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
if (invokeHandler()) {
try {
((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
} else {
bind(localAddress, promise);
}
}
这里 handler() 方法在 HeadContext 中是返回 this 。
来到 HeadContext.bind:
@Override
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
unsafe.bind(localAddress, promise);
}
这里 unsafe 为 NioMessageUnsafe,最终来到 AbstractUnsafe 的 bind :
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
assertEventLoop();
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
// See: https://github.com/netty/netty/issues/576
if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
localAddress instanceof InetSocketAddress &&
!((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
!PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
// Warn a user about the fact that a non-root user can't receive a
// broadcast packet on *nix if the socket is bound on non-wildcard address.
logger.warn(
"A non-root user can't receive a broadcast packet if the socket " +
"is not bound to a wildcard address; binding to a non-wildcard " +
"address (" + localAddress + ") anyway as requested.");
}
boolean wasActive = isActive();
try {
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);
}
会来到外部类的 doBind, 最终回到 NioServerSocketChannel 的 doBind 方法:
@SuppressJava6Requirement(reason = "Usage guarded by java version check")
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}