Netty流程简析-接收客户端连接

服务端启动完成后, 启动了一个mainReactor, 它不断监听Selector上NioServerSocketChannel是否Accept就绪, 如果Accetp就绪, 要做如下工作

  1. 初始化一个NioSocketChannel
  2. 初始化NioSocketChannel对应的Pipeline,即将用户自定义的childhander添加到pipeline中, 这里一般是一个initializer
  3. 将NioSocketChannel注册到一个NioEventLoop中, 该NioEventLoop作为subReactor
  4. 触发channelRegister事件, initializer响应该事件, 从而初始化NioSocketChannel对应的Pipeline.

0 NioEventLoop.run()

1
2
3
4
5
for(;;) {
select();
processSelectedKeys();
runAllTask();
}

主要做三方面工作

  1. 选择就绪的Key, 这里指有客户端到来.
  2. 处理就绪Key
  3. 执行队列中其它任务

0.1 NioEventLoop.select()

选择就绪的Key, 这里指有客户端到来.

0.2 NioEventLoop.processSelectedKeysOptimized(selectedKeys)

1
2
3
4
5
6
7
for(int i = 0;;i++) {
final SelectionKey k = selectedKeys[i];
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
}
}

这里是对每一个准备好的Key进行处理.

注意, 以下这行代码

1
final Object a = k.attachment();

这里的a实际上是将NioServerSocketChannel注册到Selector上时的NioServerSocketChannel

0.2.1 NioEventLoop.processSelectedKey(SelectionKey k, AbstractNioChannel ch)

1
2
3
4
5
6
AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
int readyOps = k.readyOps();

if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0) {
unsafe.read();
}

这里的UnSafe就是NioServerSocketChannel的unsafe.

0.2.1.1 NioMessageUnsafe.read()

1
2
3
4
5
6
7
8
9
10
List<Object> readBuf = new ArrayList<Object>();

do{
int localRead = doReadMessages(readBuf);
} while(...)

for (int i = 0; i < size; i ++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i));
}

这里第一个do循环是因为可能有多个用户连接进来了.之后调用NioServerSocketChannel的Pipeline

0.2.1.1.1 NioServerSocketChannel.doReadMessage(readBuf)

1
2
SocketChannel ch = javaChannel().accept(); 
buf.add(new NioSocketChannel(this, ch));

由此可以看出真正的accept操作是在这里发生的, 此时和客户端建立了链接, 并将该Channel包装成NioSocketChannel, 然后添加到readBuf中去,

0.2.1.1.2 DefaultChannelPipeline.fireChannelRead(NioSocketChannel)

这里的Pipeline是NioServerSocketChannel的Pipeline, 此时Pipeline中有一个ServerBootstrapAcceptor, 所以会传递到ServerBootstrapAcceptor的channelRead方法

0.2.1.1.3 ServerBootstrapAcceptor.channelRead(ctx, msg)

记得初始化ServerBootstrapAcceptor时, 该Acceptor有四个属性是由ServerBootstrap传过来的.

1
2
3
4
this.childGroup = childGroup;
this.childHandler = childHandler;
this.childOptions = childOptions;
this.childAttrs = childAttrs;

这里的childGroup就是subReactor, childHandler就是

1
b.group(bossGroup, workerGroup).childHandler(new Initializer());

这个Initializer是用来初始化NioSocketChannel所对应的Pipeline的.

来看ServerBootstrapAcceptor的read方法.

1
2
3
4
5
6
7
8
9
10
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throwsException {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});

做了以下工作

  1. 将initializer添加到Pipeline
  2. 注册NioSocketChannel到某一EventLoop中, 而这个register操作就和之前将NioServerSocketChannel注册到NioEventLoop中一样了.

之后, 触发channelRegister事件, 执行initializer的initChannel方法, 进一步初始化NioSocketChannel所对应的pipeline.

至此, 接收客户端并注册到NioEventLoop的过程完毕.