Netty异步回调机制

前言

本文先介绍什么是异步回调, 接着给出一种简单实现 , 然后分析Netty中的实现以及使用

什么是异步回调呢?

如果在网上查什么是异步回调, 很多人会举现实生活中的例子, 如下面这个例子

你打电话问书店老板有没有《分布式系统》这本书,如果是同步通信机制,书店老板会说,你稍等,”我查一下”,然后开始查啊查,等查好了(可能是5秒,也可能是一天)告诉你结果(返回结果)。
而异步通信机制,书店老板直接告诉你我查一下啊,查好了打电话给你,然后直接挂电话了(不返回结果)。然后查好了,他会主动打电话给你。在这里老板通过“回电”这种方式来回调。

如果说的更专业一些, 像下面这样

异步调用在发出之后,这个调用就直接返回了,所以没有返回结果。换句话说,当一个异步过程调用发出后,调用者不会立刻得到结果。而是在调用发出后,被调用者通过状态、通知来通知调用者,或通过回调函数处理这个调用。

看到以上两种说法, 似乎对异步机制懂了那么一点点, 对非程序员来说, 应该是足够的了, 但是我还是好奇, 如何在程序中实现呢? 比如说被调用者到底是如何将结果传递给调用者呢? 调用者在程序中指的又是什么呢?

下面就介绍一下Netty中是如何实现这样一种异步机制的.

在介绍代码前, 这里也要举一个例子来通俗形象的解释异步机制, 然后用代码来说明这个比喻.

一个比喻

先看一张图

alt

这里小王不断的从Task队列中取出一个Task, 如果队列为空, 那么小王就什么也不做, 如果该Task是一个耗时任务, 而小王执行该任务的话, 后面的Task会得不到执行, 于是, 小王可以将Task交给小李执行, 这样, 小王就可以继续执行下一个Task了, 而小李执行完毕后, 将执行结果作为了一Task放入到小王的任务队列中去, 这样, 当小王执行到该任务时, 也就得到了结果.

如果上面的解释依然抽象的话, 可以放入一个具体场景. Task可以看成是一个数学题, 这个数学题有难有易, 易的耗时少, 难的耗时多, 如果小王卡在难题上, 那么容易的得就没时间做了, 此时, 可以把难题交给小李, 这样, 小王可以去做容易的题, 小李做完后, 把结果交给小王(作为一个任务插入到小王的任务队列中)

当然, 以上所述还是有些不准确. 在后面的实现过程中, 就可以了解到不准确是在什么地方的, 这里, 能理解以上就可以了.

下面是对以上模型的一个简单实现

一个简单实现

首先要在程序语言中找到以上概念的对应实体, 由以上描述可知, 有如下概念

  1. 小王, 小李
  2. 小王, 小李的执行
  3. 任务
  4. 任务队列
  5. 将任务提交到任务队列

对于以上概念, 可以有以下实体

  1. 线程
  2. 线程的执行
  3. Runnable
  4. BlockingQueue
  5. 将Runnable差入到BlockQueue中

定义如下这样一个Thread

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class Person extends Thread {

BlockingQueue<Runnable> taskQueue; //任务队列
public Person(String name) {
super(name);
taskQueue = new LinkedBlockingQueue<>();
}

@Override
public void run() {
while(true) { //无限循环, 不断从任务队取任务
try {
Runnable task = taskQueue.take();
task.run();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

public void submit(Runnable task) { //将任务提交到任务队列中去
taskQueue.offer(task);
}
}

而对于其使用呢? 使用之前做数学题的例子.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
void main() {

final Person wang = new Person("wang");
final Person li = new Person("li");
li.start(); //启动小王
wang.start(); //启动小李
wang.submit(new Runnable() { //提交一个简单的题
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 1. 这一道简单的题");
}
});

wang.submit(new Runnable() { //提交一个复杂的题
@Override
public void run() {
li.submit(new Runnable() { //将复杂的题交给li来做
@Override
public void run() {
System.out.println(Thread.currentThread().getName() +" 2. 这是一道复杂的题");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
wang.submit(new Runnable() { //做完之后将结作为Task返回给wang
@Override
public void run() {
System.out.println(Thread.currentThread(.getName() + "复杂题执行结果");
}
});
}
});
}
});

wang.submit(new Runnable() { //提交一个简单的题
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 3. 这一道简单的题");
}
});
}

执行结果是

wang 1. 这是一道简单的题
wang 3. 这是一道简单的题
li 2. 这是一道复杂的题
wang 复杂题执行完毕

可以看到第2道难题完全没有影响到1, 3题的执行.

不过, 为什么提交复杂的题代码会这么复杂呢?

可以这样认为, wang的执行肯定是执行某一个任务. 提交复杂的题给li这一操作肯定也是属于某一任务的, 这里wang不可能越过任务直接将复杂的题提交给li. 所以需要将提交操作用一个任务包装起来, 同理li的执行结果也以同样的方式提交给wang.

Netty中的实现

先看一下在Netty中实现以上场景的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
final DefaultEventExecutor wang = new DefaultEventExecutor();
final DefaultEventExecutor li = new DefaultEventExecutor();
wang.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 1. 这是一简单的题");
}
});

wang.execute(new Runnable() {
@Override
public void run() {
final Promise<Integer> promise = wang.newPromise();
promise.addListener(new GenericFutureListener<Future<? superInteger>>() {
@Override
public void operationComplete(Future<? super Integer> future)throws Exception {
System.out.println(Thread.currentThread().getName() + "复题执行结果");
}
});
li.execute(new Runnable() {
@Override
public void run() {
System.out.println("执行计算任务的线程 " +Thread.currentThread());
promise.setSuccess(10);
}
});
}
});

wang.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 3. 这是一简单的题");
}
});

看起来和简单实现中的代码差不多, DefaultEventExecutor可以简单的看做拥有一个队列的线程.

与简单实现不同的是, 小李执行完任务后通知小王的方式. 即下面这段代码.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
final Promise<Integer> promise = wang.newPromise();
promise.addListener(new GenericFutureListener<Future<? super Integer>>() {
@Override
public void operationComplete(Future<? super Integer> future) throwsException {
System.out.println(Thread.currentThread().getName() + "复杂题执行结");
}
});

li.execute(new Runnable() {
@Override
public void run() {
System.out.println("执行计算任务的线程 " + Thread.currentThread());
promise.setSuccess(10);
}
});

在Netty中Promise代码一个可写的异步任务结果, 以上代码的含义是

  1. 生成一个promise
  2. 为该promise注册一个listener, 当任务执行完后回调该listener
  3. 在另一个线程中执行一个异步任务, 执行完后, 将promise设置为成功
  4. 回调listener, 该listener在异步任务提交者线程中执行.

下面, 从源代码分析一下, 上术过程是如何做到的.

先看promise.setSuccess

1. DefaultPromise.setSuccess(V result)

1
2
3
4
if (setSuccess0(result)) {
notifyListeners();
return this;
}

关键的是notifyListeners();

2. DefaultPromise.notifyListeners()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private void notifyListeners() {

EventExecutor executor = executor();
if (executor.inEventLoop()) {
notifyListenersWithStackOverFlowProtection();
} else {
safeExecute(executor, new OneTimeTask() {
@Override
public void run() {
notifyListenersWithStackOverFlowProtection();
}
});
}
}

3. DefaultPromise.executor()

1
2
3
protected EventExecutor executor() {
return executor;
}

由2和3知道, 如果当前线程不是executor, 就将notifyListener包装成一个Task添加了executor的taskqueue中执行, 如果是executor则直接在当前线程中执行.

从上面的行为可以猜得到, executor成员的含义应该是异步任务的提交者, 那么executor是什么时候被赋值呢?

4. DefaultEventExecutor.newPromise();

1
2
3
4
5
6
7
public <V> Promise<V> newPromise() {
return new DefaultPromise<V>(this);
}

public DefaultPromise(EventExecutor executor) {
this.executor = checkNotNull(executor, "executor");
}

可见, 利用DefaultEventExecutor生成promise时, 将该executor赋值给promise.executor.

至此, 明白了Netty中异步任务执行完毕后, 通知提交者的机制

一种误用

为了实现以上问题, 还可以像下面这样写.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
wang.submit(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 1. 这是一简单的题");
}
});

wang.submit(new Runnable() {
@Override
public void run() {

Future<String> result = li.submit(new Callable<String>() {
@Override
public String call() throws Exception {
for(int i = 0; i <= 10000000; i++){
for(int j = 0; j <= 1000000; j++) {
;
}
}
System.out.println(Thread.currentThread().getName() + " 2. 这一道复杂的题");
return null;
}
});
result.addListener(new GenericFutureListener<Future<? super String>>(){
@Override
public void operationComplete(Future<? super String> future)throws Exception {
System.out.println(Thread.currentThread().getName() + "3.复杂题执行结果");
}
});
}
});

wang.submit(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 3. 这是一简单的题");
}
});

这样写似乎更简单, 但运行一下会发现, listener的执行却是由小李来处理, 按理说, 小王交给小李一个任务, 小李做完之后将结果返回给小王, 应该是小王处理才对啊, 可为什么是小李来处理呢?

查看源码可知, DefaultEventExecutor.submit方法将一个Callable包装成一个DefaultPromise, 并且将执行者作为DefaultPromise的exectutor, 为什么要这样做呢?

我猜是如下这样的原因.

Netty的异步回调机制需要提交者必须有一个TaskQueue才行, 而这里wang并不一定含有一个TaskQueue, 为了防止因为提交者没有TaskQueue而出错, 所以只能赋值为执行者, 而使用newPromise就没有问题, 因为newPromise是DefaultEventExecutor的接口, 而DefaultEventExecutor肯定有一个TaskQueue.

比如如下情况就可能出错

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Thread wang = new Thread(new Runnable() {
@Override
public void run() {

Future<String> result = li.submit(new Callable<String>() {
@Override
public String call() throws Exception {
for(int i = 0; i <= 10000000; i++){
for(int j = 0; j <= 1000000; j++) {
;
}
}
System.out.println(Thread.currentThread().getName() + " 2.这是一道复杂的题");
return null;
}
});
result.addListener(new GenericFutureListener<Future<? superString>>() {
@Override
public void operationComplete(Future<? super String> future)throws Exception {
System.out.println(Thread.currentThread().getName() + "3.复杂题执行结果");
}
});
}
});

Netty源码中对异步回调的使用

在Netty中, ChannelHandlerContext的

  1. write(msg, promise)
  2. bind(address, promise)

等操作都是一个异步操作, 为了使该操作不阻塞当前executor的执行, 一般这样使用

1
2
3
Promise result = ctx.newPromise(ctx.executor());
ctx.write(msg, result);
result.addListener(listener);