Netty-理解Pipeline

前言

在运行过程中, 每一个NioSocketChannel对应的Pipeline实际是如下这样子

alt

仔细观察可以发现

  1. Pipeline内部实际上是一个双向链表, 每个元素实际上是一个ChannelHandlerContext
  2. 每一个Handler被一个ChannelHandlerContext所包装
  3. 该Pipeline中隐含着两个Context, 一个是HeadContext, 另一个是TailContext, 通过源代码可以看出, 这两个Context也是Handler.

在使用Pipeline中自己也有如下不解.

  1. ChannelHandlerContext的作用是什么, 为什么每一个Handler需要包装一个ChannelHandlerContext
  2. 数据是如何在不同Handler中传递的
  3. Handler在哪一个executor中执行呢? 可以为Handler指派不同的executor吗, 如果handler在不同的executor中执行, 那么数据又是怎么在handler中传递的呢?
  4. HeadContext和TailContext的作用是什么?
  5. InBound Event来OutBound Event到底是什么? 有什么不同呢? 在Handler中传递有什么区别呢?

本文通过分析源代码, 回答1, 2, 3问题

  1. Netty版本4.1.0
  2. 对代码做了精简

通过本文, 对以下

Channel, Pipeline, Handler, ChannelHandlerContext, InBound Event, OutBound Event有更深入的了解.


当将Handler添加到Pipeline中时, 最终调用的方法是

1
2
3
4
5
6
7
8
9
10
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name ChannelHandler handler) {
final EventExecutor executor;
final AbstractChannelHandlerContext newCtx;

newCtx = newContext(group, filterName(name, handler), handler);

addLast0(newCtx);
return this;
}

再看newContext方法

1
2
3
private AbstractChannelHandlerContext newContext(EventExecutorGroup group,String name, ChannelHandler handler) {
return new DefaultChannelHandlerContext(this, childExecutor(group),name, handler);
}

该方法主要做了两件事

  1. 将handler包装一个ChannelHandlerContext
  2. 从group中取得一个childExecutor, 赋值给DefaultChannelHandlerContext的executor成员
  3. 根据该handler是InBoundHandler还是OutBoundHandler为该Context设置inbound或outbound

其中DefaultChannelHandlerContext的executor的含义是.

执行所包装的Handler的executor

所以可以看出:

Handler可以在不同的executor中执行, 如果不指定, 则该executor是NioEventLoop, 这就是默认情况下Handler在NioEventLoop中执行.

再看addLast0()方法

1
2
3
4
5
6
7
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}

可以看出, 就是将该Context加入到双向队列中去.

我们知道, 在Handler处理完数据后, 要想将数据传递到后一个Handler中, 要调用Context的相关方法, 假如这里一个InBoundHandler处理完了一个数据, 调用了

1
ctx.fireChannelRead(msg);

这个方法将msg传递到下一个handler中, 而前面已经知道, 下一个handler可能运行在另一个executor中, 那么解答不同exexutor中handler间数据的传递就在这个方法中了. 下面看这个方法做了什么.

1
2
3
4
5
6
public ChannelHandlerContext fireChannelRead(final Object msg) {

invokeChannelRead(findContextInbound(), msg);

return this;
}

先调用的findContextInbound()

1
2
3
4
5
6
7
private AbstractChannelHandlerContext findContextInbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while (!ctx.inbound);
return ctx;
}

可以看出, 该方法的含义是找到当前handler后的第一个InBoundHandler

再看invokeChannelRead()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
static void invokeChannelRead(final AbstractChannelHandlerContext next,Object msg) {

EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new OneTimeTask() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}

现在真相大白了, 先返回下一个handler的executor, 然后利用

1
executor.inEventLoop()

判断当前handler的executort和下一个Handler的executor是不是相同, 如果是, 就直接在当前executor中执行, 如果不是, 则打包成一个Task, 加入到下一个executor的TaskQueue中执行.

而next.invokeChannelRead(msg)是

1
2
3
private void invokeChannelRead(Object msg) {
((ChannelInboundHandler) handler()).channelRead(this, msg);
}

即, 调用handler的channelRead方法, 至此, msg传入到了下一个handler中

所以

如果两个handler在不同executor中执行, 那么将msg传递到下一个handler是通过TaskQueue来进行了.

现在, 可以总结ChannelHandlerContext的作用了.

它将与处理数据无关的职能从handler中剥离了出去, 用来管理数据在pipeline中的传递.

通过以上, 其实已经回答了问题1, 2, 3. 下面回答4, 5


TailContext, HeadContext的作用

由图中可知, 两者是netty帮程序员自动添加的. 两者有如下特点

  1. 既是Context, 又是Handler
  2. HeadContext是InBoundHandler和OutBoundHandler, 而TailContext只是InBoundHandler

那么, 为什么这样设计呢?

在解释这个之前, 需要了解一下Inbound Event和OutBound event. 暂时记住以上两个两个结论

InBound Event, OutBound Event

在ChannelPipeline的文档中, 有如下这样描述

Inbound event propagation methods

1
2
3
4
5
6
7
8
9
ChannelHandlerContext.fireChannelRegistered()
ChannelHandlerContext.fireChannelActive()
ChannelHandlerContext.fireChannelRead(Object)
ChannelHandlerContext.fireChannelReadComplete()
ChannelHandlerContext.fireExceptionCaught(Throwable)
ChannelHandlerContext.fireUserEventTriggered(Object)
ChannelHandlerContext.fireChannelWritabilityChanged()
ChannelHandlerContext.fireChannelInactive()
ChannelHandlerContext.fireChannelUnregistered()

Outbound event propagation methods:

1
2
3
4
5
6
7
8
ChannelOutboundInvoker.bind(SocketAddress, ChannelPromise)
ChannelOutboundInvoker.connect(SocketAddress, SocketAddress,ChannelPromise)
ChannelOutboundInvoker.write(Object, ChannelPromise)
ChannelHandlerContext.flush()
ChannelHandlerContext.read()
ChannelOutboundInvoker.disconnect(ChannelPromise)
ChannelOutboundInvoker.close(ChannelPromise)
ChannelOutboundInvoker.deregister(ChannelPromise)

可以看出, 这些方法是ChannelHandlerContext的方法

而Pipeline中也有类似的方法

1
2
ChannelPipeline.fireChannelRead(Object)
ChannelPipeline.write(Object)

而如果去看AbstractChannel, 则有

1
AbstractChannel.write(Object)

而没有fire开头的方法

再看AbstractUnsafe, 也有

1
AbstractUnsafe.write(Object, ChannelPromise)

没有fire开头的方法

同时, 对于AbstractChannel, 有

1
AbstractChannel.doWrite(ChannelOutboundBuffer in)

没有fire开头的方法.

这有些混乱.

如果前以上模糊的地方整理一下, 有以下几点

  1. ChannelHandlerContext, Channel, Unsafe, Pipeline有一些类似的方法
  2. 有的类中有fire前缀的方法和非fire前缀的方法, 有的类中只有非fire前缀的方法
  3. 同一非fire前缀的方法, 出现在多个类中. 如write方法
  4. 同一非fire前缀的方法, 出现的多个类中, 参数有不同, 如ChannelPipeline.write(Object), AbstractUnsafe.write(Object, ChannelPromise)
  5. 同一非fire前缀的方法, 出现的多个类中, 但名称也有些不同, 如AbstractChannel.write(Object)和AbstractChannel.doWrite(ChannelOutboundBuffer in)

下面就来理一理, 澄清以上的困惑, 进而也就回答了问题4, 5

1. fire前缀的方法

fire前缀的方法叫InBound Event propagation methods.

什么叫InBound Event呢? 就是被动事件, 这里被动的含义指Netty程序根据事件被动的作出反应, 而不是主动做出反应.

举个例子, NioEventLoop一直在Selector上进行监听, 当一个Channel在Selector上读就绪时, 触发一个Read事件, 然后就调用fireChannelRead(Object)方法在Pipeline中传递这个事件, 利用handler对此逐步处理.

如果看源码可以发现, 当NioEventLoop从Socket读入数据后, 会调用Pipeline的fireChannelRead方法, 而该方法在Pipeline与ChannelHandlerContext中都有, 有什么区别呢?

看一下Pipeline.fireChannelRead的源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public final ChannelPipeline fireChannelRead(Object msg) {
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}

static void invokeChannelRead(final AbstractChannelHandlerContext nextObject msg) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new OneTimeTask() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}

可以看出,

PipeLine的fire方法仅仅是将数据传递给了HeadContext而已

而HeadContext的executor是NioEventLoop, 所以是在NioEventLoop中执行的, 再来看一下HeadContext对读进来的数据做了什么操作

1
2
3
public void channelRead(ChannelHandlerContext ctx, Object msg) throwsException {
ctx.fireChannelRead(msg);
}

很简单,

就是传递给下一个Handler

对于其它InBound也是类似的含义.

至此, 澄清了

Pipeline与ChannelHandlerContext中fire方法的区别, 因为第一个Handler之前没有Handler, 所以依赖Pipeline的fire方法传递给第一个Handler

2. 非fire前缀方法

ChannelHandlerContext中非fire前缀方法叫做Outbound event propagation methods, 这里Outbound event可以称为主动事件, 什么是主动事件呢?

从Channel读是一个被动事件, 这是因为服务端得等待着客户端数据的到来, 服务端不能主动做动作.

而向Channel写则是一个主动事件, 因为服务端可以决定什么时候来写. 这是一个主动的行为.

常用的写事件如下

  1. bind(SocketAddress, ChannelPromise)
  2. connect(SocketAddress, SocketAddress, ChannelPromise)
  3. write(Object, ChannelPromise)
  4. flush()
  5. read()
  6. disconnect(ChannelPromise)
  7. close(ChannelPromise)
  8. deregister(ChannelPromise)

从名称中可以看出, 这些行为都可以是主动行为, 但有的行为即可以主动, 又可以被动, 如close, 如果说等待客户端的close则是一种被动行为, 只是名称不同, fireChannelInactive.

这里的read可能不好理解, read怎么可能是一个主动行为呢? 之后会对此进行解释.

关于非fire前缀的方法, 有三点需要注意

  1. 主动事件是如何在Pipeline中传播的
  2. 方法中的Promise是做什么用的, 为什么fire方法没有Promise
  3. 非fire前缀方法出现在多个类有, 有什么区别

下面通过分析将数据写入到Channel的流程, 就可以回答以上的几个问题

write操作的传播

假如从客户端读入数据后, 在某一InBoundHandler中处理完毕, 此时需要写回到Channel中, 于是在该Handler的channelRead方法中执行如下代码.

1
2
ChannelFuture future = ctx.writeAndFlush(response);
future.addListener(ChannelFutureListener.CLOSE);

该段代码的含义是将响应发送给客户端后, 关闭Channel, 现在, 深入源码看都发生了什么

先是DefaultChannelHandlerContext.write(Object)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public ChannelFuture write(Object msg) {
return write(msg, newPromise());
}

public ChannelFuture write(final Object msg, final ChannelPromise promise){
write(msg, false, promise);
return promise;
}

private void write(Object msg, boolean flush, ChannelPromise promise) {
AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
AbstractWriteTask task;
if (flush) {
task = WriteAndFlushTask.newInstance(next, m, promise);
} else {
task = WriteTask.newInstance(next, m, promise);
}
safeExecute(executor, task, promise, m);
}
}

通过以上可以看出

  1. 调用write会隐式传入一个Promise, 而我们知道, Promsie代表一个异步任务的结果, 所以这里writeAndFlush操作是一个异步任务, 即不会阻塞当前executor, 而且在写成功后会修改Promise为success, 进而通知listener
  2. 其传播方式与fire是类似的, 先找到下一个OutBoundHandler, 看是不是在同一个executor中, 不是的话就加入到executor的taskqueue中. 不过, 这里传播的是两个事件, write 和 flush, 会依次调用Handler中的这两个方法

由上知道 , 第一个OutBoundHandler是HeadContext, write操作最终会传播到HeadContext, 现在来看HeadContext的write和flush方法做了什么.

1
2
3
4
5
6
7
8
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromisepromise) throws Exception {
unsafe.write(msg, promise);
}

@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
unsafe.flush();
}

这里的UnSafe是NioSocketChannel关联的UnSafe, 即NioSocketChannelUnsafe

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public final void write(Object msg, ChannelPromise promise) {
assertEventLoop();
int size;
try {
msg = filterOutboundMessage(msg);
size = pipeline.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
} catch (Throwable t) {
safeSetFailure(promise, t);
ReferenceCountUtil.release(msg);
return;
}
outboundBuffer.addMessage(msg, size, promise);
}

@Override
public final void flush() {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
return;
}
outboundBuffer.addFlush();
flush0();
}

从以上代码中可以看出, 这两个方法必须在NioEventLoop中执行, write方法只是将msg写入到缓冲区, 并没有修改promise和发生真正的写Channel操作, 这说明可能这两个操作发生在flush()中.

查看flush0

1
2
3
protected void flush0() {
doWrite(outboundBuffer);
}

该方法调用doWrite, 而doWrite是UnSafe所在Channel的abstract方法, 这里的NioSocketChannel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
for (;;) {
int size = in.size();
if (size == 0) {
clearOpWrite();
break;
}
long writtenBytes = 0;
boolean done = false;
boolean setOpWrite = false;
ByteBuffer[] nioBuffers = in.nioBuffers();
int nioBufferCnt = in.nioBufferCount();
long expectedWrittenBytes = in.nioBufferSize();
SocketChannel ch = javaChannel(); //关联SocketChannel

switch (nioBufferCnt) {
case 0:
super.doWrite(in);
return;
case 1:
ByteBuffer nioBuffer = nioBuffers[0];
for (int i = config().getWriteSpinCount() - 1; i >= 0; i--) {
final int localWrittenBytes = ch.write(nioBuffer); //真正的写Channel
if (localWrittenBytes == 0) {
setOpWrite = true;
break;
}
expectedWrittenBytes -= localWrittenBytes;
writtenBytes += localWrittenBytes;
if (expectedWrittenBytes == 0) {
done = true;
break;
}
}
break;
default:
for (int i = config().getWriteSpinCount() - 1; i >= 0; i--) {
final long localWrittenBytes = ch.write(nioBuffers, 0,nioBufferCnt);
if (localWrittenBytes == 0) {
setOpWrite = true;
break;
}
expectedWrittenBytes -= localWrittenBytes;
writtenBytes += localWrittenBytes;
if (expectedWrittenBytes == 0) {
done = true;
break;
}
}
break;
}
in.removeBytes(writtenBytes);
if (!done) {
incompleteWrite(setOpWrite);
break;
}
}
}

该方法很长, 只看有注释的地方, 可以看出真正的将数据写入Channel是发生在这里, 而且是利用JDK的Socket接口.

那promise呢? 进入in.removeBytes(writtenBytes)看一看

1
2
3
4
5
public void removeBytes(long writtenBytes) {
for (;;) {
remove();
}
}

再看remove()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public boolean remove() {
Entry e = flushedEntry;
if (e == null) {
clearNioBuffers();
return false;
}
Object msg = e.msg;

ChannelPromise promise = e.promise;
int size = e.pendingSize;

removeEntry(e);

if (!e.cancelled) {
// only release message, notify and decrement if it was not canceled before.
ReferenceCountUtil.safeRelease(msg);
safeSuccess(promise);
decrementPendingOutboundBytes(size, false, true);
}

// recycle the entry
e.recycle();

return true;
}

这里总算可以看到safeSuccess(promise)将promise设置为完成了.

至此, 一个write方法的执行完成了, 始于handler调用, 终于JDK socket的write().

结合以上分析与fire方法分析, 可以澄清一些东西了.

  1. fire方法是InBound event的传播方法, 所以只有Pipeline和ChannelHandlerContext有, Pipeline的方法只是用于将event传给第一个Handler.
  2. fire方法没有Promise, 它只是用来将数据在Pipeline中不断传递和处理.
  3. 非fire的方法分为两类, 一类是传播方法, Pipeline和ChannelHandlerContext中的方法就是传递方法, Pipeline的非fire方法用来传递给TailContext, 与数据传入不同, 数据传入肯定会经过HeadContext, 而数据的传出肯定会经过HeadContext, 但不一定经过TailContext, 只要从中间某一个Handler调用这个方法即可. 另一类是非传递方法即Unsafe中的方法, 此方法利用相关联的Channel中的do开头相关方法, 进行真实的IO操作.
  4. 可以推断, HeadContext作为非fire方法传递的终点站, 会调用Unsafe的方法
  5. 非fire方法有的有promise参数, 有的没有, 其实都会传入一个promise参数, 其含义是要得到操作结果的一个通知, 通常这是有意义的, 如write操作完成后关闭Channel
  6. Channel中的非fire方法, 如bind, 实际上调用的是pipeline的bind方法. 这里只是提供了一个方便的接口吧.
  7. Channel中do开头的方法, 是利用所关联的JDK Channel进行实际的操作

另:OutBound中read方法是做什么用的?

现在已经知道, OutBound事件最终用HeadContext处理

1
2
3
public void read(ChannelHandlerContext ctx) {
unsafe.beginRead();
}

最终走到NioSocketChannel的

1
2
3
4
5
6
7
8
9
10
11
12
13
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}

readPending = true;
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}

反正是做了以上工作, 这里自己也搞不懂什么作用, 以后再补充吧(TODO)

至此, 开始的5个问题都已解决了.