Netty流程简析 ServerBootstrap

一个最简单的Netty服务端程序如下

1
2
3
4
5
6
7
8
9
10
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new TelnetServerInitializer());

b.bind(PORT);

这一段代码主要做了以下几个工作

  1. 将一个NioServerSocketChannel注册到Selector上
  2. 初始化NioServerSocketChannel对应的Pipeline, 主要是添加了一个ServerBootstrapAccepter用来处理之后的客户端请求.
  3. 启动mainReactor在Selector上监听Accept事件
  4. 为NioServerSocketChannel绑定一个端口

本文跟踪源码, 分析以上过程.

有如下两点说明

  1. 基于Netty 4.1.0
  2. 源码部分做了精简

Bind之前ServerBootstrap所以工作

1
2
3
4
b.group(bossGroup, workerGroup)                
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new TelnetServerInitializer());
  1. ServerBootstrap.group(bossGroup, workerGroup)

    设置mainReactor和subReactor

  2. ServerBootstrap.channel()

    设置mainReactor所对应的channel, 这里是NioServerSocketChannel

  3. ServerBootstrap.handler()

    设置NioServerSocketChannel所对应Pipeline中的Handler, 后文中ServerBootstrap.init()方法会将该Handler添加到pipeline中

  4. ServerBootstrap.childHandler()

    为subReactor设置childHandler


0 ServerBootstrap.bind(PORT)

1
2
3
ChannelFuture bind(SocketAddress localAddress){
return doBind(localAddress);
}

0.1 ServerBootstrap.doBind(localAddress)

1
2
3
4
5
6
7
8
9
10
11
12
13
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();

if (regFuture.isDone()) {
doBind0(regFuture, channel, localAddress, promise);
} else {
regFuture.addListener{
doBind0(regFuture, channel, localAddress, promise);
}
}
return promise;
}

该方法主要做了两方面工作

  1. initAndRegister
  2. doBind0

关键是理解这其中到底发生了什么

0.1.1 AbstractBootstrap.initAndRegister()

1
2
3
4
5
6
ChannelFuture initAndRegister() {
final Channel channel = channelFactory.newChannel();
init(channel);
ChannelFuture regFuture = group().register(channel);
return regFuture;
}

该方法做了三方面工作

  1. newChannel(), 这里是生成一个NioServerSocketChannel
  2. init(channel)
  3. group().register(channel):group是bossGroup

0.1.1.1 生成NioServerSocketChannel实例

查看源代码, 生成该对象时做了以下事情

  1. this.ch = SelectorProvider.openServerSocketChannel() //关联JDK中的Channel
  2. this.readInterestOp = SelectionKey.OP_ACCEPT //该Channel感兴趣的操作
  3. ch.configureBlocking(false)
  4. this.unsafe = new NioMessageUnsafe()
  5. pipeline = new DefaultChannelPipeline(this) //为该Channel关联一个Pipeline
  6. this.config = new NioServerSocketChannelConfig() //设置默认Config

对于以上有几点说明

  1. 生成DefaultChannelPipeline时, 会自动生成两个Handler, 一个是TailContext, 一个是HeadContext
  2. Unsafe类用来实现actual transport, 除了以下方法外, 必须从I/O thread中调用,

    • localAddress()
    • remoteAddress()
    • closeForcibly()
    • register(EventLoop, ChannelPromise)
    • deregister(ChannelPromise)
    • voidPromise()

0.1.1.2 ServerBootstrap.init(Channel)

最主要的代码是

1
2
3
4
5
6
7
8
9
10
11
12
13
ChannelPipeline p = channel.pipeline();
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
pipeline.addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler,currentChildOptions currentChildAttrs));
}
});

该函数做以下工作

  1. 设置channel的option与attr
  2. 为NioServerSocketChannel所对应的Pipeline添加一个ChannelInitializer, 该initializer会在Channel向Selector注册时执行. 执行的效果是将一个ServerBootstrapAcceptor添加到Pipeline中.如果为ServerBootstrap设置了handler属性, 则将该handler添加到Pipeline中, 如设置LogHandler, 当有客户端接入时, 就生成一条Log

可见, 初始化阶段NioServerSocketChannle对应的Handler中只有一个ChannelInitializer, 注意这个Initializer是对NioServerSocketChannel的PipeLine进行初始化的, 初始化的时机是在NioServerSocketChannel向Selector注册之后

0.1.1.3 NioEventLoopGroup.register(channel)

该方法是将channel向EventLoop注册, 该方法经过以下步骤

1. SingleThreadEventLoop.register(Channel)

1
register(new DefaultChannelPromise(channel, this))

用一个Promise包装channel, 该Promise的两个重要成员

  1. NioServerSocketChannel
  2. NioEventLoop

可见注册也是IO操作, 也是一个异步任务

2. SingleThreadEventLoop.register(ChannelPromise)

1
promise.channel().unsafe().register(this, promise);

这里调用的是NioServerSocketChannel的unsafe()成员, 即NioMessageUnsafe的register方法

3. NioMessageUnsafe.register(EventLoop, ChannelPromise)

1
2
3
4
5
6
7
8
9
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
eventLoop.execute(new OneTimeTask() {
@Override
public void run() {
register0(promise);
}
});

此时, 还没有启动EventLoop, 所以执行else. 因为NioMessageUnsafe是NioServerSocketChannel的内部类, 所以这里register0是NioServerSocketChannel的方法.

4. NioServerSocketChannel.register0(ChannelPromise)

1
2
3
doRegister();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();

这里做三件事

  1. 执行注册
  2. 设置Promise成功
  3. 触发Registered事件. 此时, ServerBootstrap.init()中设置的ChannelInitializer会被激活, 用来初始化NioServerSocketChannel对应的Pipeline

5. NioServerSocketChannel.doRegister()

1
selectionKey = javaChannel().register(eventLoop().selector, 0, this);

这里javaChannel()返回的是JDK的ServerSocketChannel, eventLoop()为NioEventLoop.

从这里可以看出, 这是将Socket注册到selector上.

但这里的0的含义是什么, 为什么不是SelectionKey.OP_ACCEPT? (TODO)

至此注册完毕, 下一步应该bind端口了.


0.1.2 ServerBootstrap.doBind0(regFuture, channel, localAddress, promise)

这里的regFuture是表示注册任务的结果, channel为NioServerSocketChannel, promise是一个新的ChannelPromise, 说明这也是一个异步任务.

1
2
3
4
5
6
7
8
9
10
channel.eventLoop().execute(new OneTimeTask() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListene(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});

即bind任务是发送到EventLoop的TaskQueue中. 而且由doBind方法可知, bind操作得在Register成功之后才行.

0.1.2.1 NioServerSocketChannel.bind(localAddress, promise)

该操作经过以下几步

1. DefaultChannelPipeline.bind(localAddress, promise);

2. TailHandler.bind(localAddress, promise)

3. HeadHandler.bind(ctx, localAddress, promise)

4. NioMessageUnsafe.bind(localAddress, promise)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
boolean wasActive = isActive();
try {
doBind(localAddress);
} catch (Throwable t) {
promise.setFailure(t);
closeIfClosed();
return;
}
promise.setSuccess();

if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}

做了四件事

  1. 判断Channel是否是Active, 对于NioServerSocketChannel来说, Active的含义是Bind成功.
  2. 进行实际的Bind操作
  3. 设置Promise为成功或失败.
  4. 将触发Active事件放入TaskQueue中.

5. NioServerSocketChannel.doBind(localAddress)

1
javaChannel().socket().bind(localAddress, config.getBacklog());

6. 调用JDK serversocket的bind方法.

至此启动部分完成. 下一步就是等待客户端接入