上篇文章 简单介绍了一下如何使用Vert.x部署一个Verticle并创建一个http server。以及路由的试用。在部署Verticle的时候,我们使用的 start() 函数的重载是start(Promise<Void>)。这里的Promise就是Vert.x异步框架会用到的最多的一个对象(划重点)。Promise对象是从代码上能最直观体现Vert.x的异步非阻塞特性的。
异步非阻塞概念
Vert.x十一个异步非阻塞的框架。如果想熟练高效使用这个最少先把异步非阻塞这两个概念搞明白。
同步、异步
这里的异步,主要是指消息通信机制的异步。例如一个函数调用另一个函数。如果是同步机制,调用者调用被调用者,在被调用者执行过程中,调用者主动等待这个调用,被调用者执行完成后返回对应结果。
异步调用的情况下,调用者调用被调用者后,被调用者执行完成后,主动通知调用者(回调)。
注意同步异步,只是针对被调用者而言,是函数返回结果的通信机制而已。
阻塞、非阻塞
阻塞和非阻塞是针对线程而言的。还是这个例子,先说同步情况下。
如果是同步情况下,调用者调用函数后,当前线程就会挂起,被调用者开始执行,被调用者执行完成返回结果后,调用者线程再继续执行。在Main方法中随便写个最简单的函数调用就是这种。这种调用者线程就是阻塞的。
非阻塞就是在调用者调用一个函数后,不阻塞调用者当前线程,不等待被调用者返回,而是每隔一段时间检查被调用者是否完成。
如果是异步非阻塞。调用者调用对应函数后,不阻塞当前线程,不用等待函数返回。被调用者执行完成后再通知调用者,进行后续处理。
拿KFC的点餐举个栗子
- 以前KFC是点餐和配餐同时的,付钱之后站在原地等着配餐,后面没点餐的也陪你等配餐,只有你拿到配餐之后,其他排队的才能开始点餐。 同步阻塞型
- 后来KFC经理智商回暖了。新开一个配餐柜台,付钱后拿到一个小票,你就可以该干啥干啥去,不要耽误后面人点餐。你只要同时监控一下配餐窗口显示器,你的小票号码出现在显卡器上就可以取餐。 同步非阻塞
- 随着科技发展,现在点餐不需要你自己监控显示器,而是在配餐完成后发短信给你的支付手机。你就可以去做别的事情,当手机收到通知时再去配餐柜台取餐即可 异步非阻塞
- 至于异步阻塞,emmm~ 大概就是拿着手机支付完还霸占着点餐台等这种行为吧。。。(轻蔑)
Vert.x 异步线程模型
这里推荐一个大神的文章Vert.x 技术内幕 ,当时我最初接触Vert.x也是3.3.3版本,这位大神深入简出,结合官方资料和自己的知识详细介绍了Vert.x的技术原理和细节,看过之后受益良多。这里我就不班门弄斧了。我主要还是从代码和应用角度梳理一下Vert.x的试用。
这些点是一定要注意的
- 我们写的代码,只有两种,Event Loop 线程和 Worker 线程,Event Loop 线程是绝对不能阻塞其运行的。
- 不要自己手动创建线程,使用Vert.x提供的异步接口来创建线程。
Don’t block me!
The Golden Rule - Don’t Block the Event Loop
Running blocking code
示例
定时器
代码示例之前,先给大家介绍一个Vert.x的一个接口执行定期和延迟的操作,在实际开发中也会用到。这里用来帮助我们理解代码也是及其方便的。
在test目录下创建TestTimerVerticle类,修改TestMain,部署TestTimerVerticle
public class TestMain {
public static void main(String[] args) {
// Vertx.vertx().deployVerticle(MainVerticle.class.getName());
Vertx.vertx().deployVerticle(TestTimerVerticle.class.getName());
}
}
实现TestTimerVerticle的start()函数,在方法中添加测试代码
System.out.println(LocalDateTime.now().toString() + ":设置一个timer");
vertx.setTimer(3 * 1000, id -> System.out.println(LocalDateTime.now().toString() + ":3秒到了,该打印 Hello World 了"));
System.out.println(LocalDateTime.now().toString() + ":3秒后打印 Hello World!");
调用setTimer方法,设置一个定时器,定时3秒后执行回调方法,打印3秒到了,该打印 Hello World 了,前面加一个时间戳,判断打印顺序。
2019-09-03T16:45:27.792312:设置一个timer
2019-09-03T16:45:27.794229:3秒后打印 Hello World!
2019-09-03T16:45:30.794139:3秒到了,该打印 Hello World 了
从输出结果可以看出,setTimer 本身就是一个异步非阻塞函数,调用timer函数,并没有阻塞当前start方法,先打印出了3秒后打印 Hello World!,然后3秒后,由timer方法的回调执行3秒到了,该打印 Hello World 了。目前我们可以用这个来模拟各种耗时操作的nio接口,比如查询数据库等。
异步方法
根据前面介绍,实现一个异步方法,只要我们满足函数能立即返回调用,并且在处理完成后主动通知调用方即可。
Promise
Vert.x使用Future表示一个未知结果的调用返回。从3.8.0版本后不再直接使用,而是配合Promise接口使用,创建一个响应式的函数接口非常简单。
声明
private Future<String> getSomething() {
Promise<String> promise = Promise.promise();
System.out.println(LocalDateTime.now().toString() + "调用getSomething函数,查询一些东西,假设查询需要5秒");
System.out.println(LocalDateTime.now().toString() + "必须调用异步api,或者使用WorkerExecutor线程封装阻塞代码。");
vertx.setTimer(5 * 1000, id -> {
System.out.println(LocalDateTime.now().toString() + "--------------------------");
System.out.println(LocalDateTime.now().toString() + "5秒后,查询完成,返回查询结果");
promise.complete("这是查询的结果");
});
System.out.println(LocalDateTime.now().toString() + "返回promise,调用完成,不阻塞调用线程。");
return promise.future();
}
调用
调用方式也很简单
System.out.println("开始测试getSomething");
getSomething().setHandler(res -> {
System.out.println("获取查询结果==>" + res.result());
});
System.out.println("开始测试getSomething 结束,主线程不阻塞,继续执行。");
System.out.println("......");
结果
重新运行项目得到输出
开始测试getSomething
2019-09-04T15:39:17.384836调用getSomething函数,查询一些东西,假设查询需要5秒
2019-09-04T15:39:17.385385必须调用异步api,或者使用WorkerExecutor线程封装阻塞代码。
2019-09-04T15:39:17.387350返回promise,调用完成,不阻塞调用线程。
开始测试getSomething 结束,主线程不阻塞,继续执行。
......
2019-09-04T15:39:22.392342--------------------------
2019-09-04T15:39:22.3924805秒后,查询完成,返回查询结果
获取查询结果==>这是查询的结果
过程梳理
输出内容打印上时间戳,可以明显看出,先打印了返回promise,调用完成,不阻塞调用线程。并且执行了return 方法5秒后才打印5秒后,查询完成,返回查询结果,并且主线程在getSomething函数返回return后立即向下执行。Promise对象之前叫做Future,现在是Promise的一个属性,虽然版本升级我一个人加班改了所有的代码,不过我还是觉得这个名字更直白,方便理解。
调用一个异步函数,函数立即返回一个Promise.future()表示将来可能的结果。而不需要阻塞等待结果。等到运行完成后提交结果完成承诺;调用者在得到Future后绑定一个Handler,当Promise被提交的时候,运行Handler里的内容。handler中的参数,就是Promise提交的内容。细看一下Promise接口
/**
* Create a promise that hasn't completed yet
*
* @param <T> the result type
* @return the promise
*/
static <T> Promise<T> promise() {
return factory.promise();
}
Promise和Futurn一样由Factory工厂产生,而且提供了静态方法方便试用。
/**
* Set the result. Any handler will be called, if there is one, and the promise will be marked as completed.
* <p/>
* Any handler set on the associated promise will be called.
*
* @param result the result
* @throws IllegalStateException when the promise is already completed
*/
void complete(T result);
/**
* Calls {@code complete(null)}
*
* @throws IllegalStateException when the promise is already completed
*/
void complete();
/**
* Set the failure. Any handler will be called, if there is one, and the future will be marked as completed.
*
* @param cause the failure cause
* @throws IllegalStateException when the promise is already completed
*/
void fail(Throwable cause);
/**
* Calls {@link #fail(Throwable)} with the {@code message}.
*
* @param message the failure message
* @throws IllegalStateException when the promise is already completed
*/
void fail(String message);
/**
* Like {@link #complete(Object)} but returns {@code false} when the promise is already completed instead of throwing
* an {@link IllegalStateException}, it returns {@code true} otherwise.
*
* @param result the result
* @return {@code false} when the future is already completed
*/
boolean tryComplete(T result);
/**
* Calls {@code tryComplete(null)}.
*
* @return {@code false} when the future is already completed
*/
boolean tryComplete();
/**
* Like {@link #fail(Throwable)} but returns {@code false} when the promise is already completed instead of throwing
* an {@link IllegalStateException}, it returns {@code true} otherwise.
*
* @param cause the failure cause
* @return {@code false} when the future is already completed
*/
boolean tryFail(Throwable cause);
/**
* Calls {@link #fail(Throwable)} with the {@code message}.
*
* @param message the failure message
* @return false when the future is already completed
*/
boolean tryFail(String message);
当函数内容处理完成后使用complete()函数提交结果,如果发生异常或需要标记失败的结果,可以使用fail()函数。需要注意一个承诺只能提交一次,一旦提交就不能改口了(多浅显的道理,为啥那么多人不懂呢?)。如果函数分支太多或逻辑过于复杂又没去拆分简化,可以使用tryComplete()和tryFail()尝试提交,如果已经被提交返回false,否则提交结果。
Handle
实现一个Handle接口,调用Promise.future.handle()进行绑定,当Promise提交返回结果时,触发执行handle接口对象,实现回调。
handle这里又涉及了一个jdk8的新特性,函数接口,简单一句话:有且只有一个公开的抽象方法的接口,就叫做函数接口。
使用注解@FunctionalInterface可以检查是否符合规范。函数接口诠释了**函数即对象**的概念,实现一个函数接口只需要实现函数接口里唯一的抽象方法即可。
Vert.x的Handle接口是一个使用十分广泛,十分重要的接口,大部分异步api回调函数都是使用了Handle接口。
Future
如果说Promise是异步回调的方式和通道,Future就是回调消息的本体。先看一下Future接口,继承了两个接口AsyncResult<T>和 Handler<AsyncResult<T>>。AsyncResult是一个异步回调返回的消息内容。主要用到的函数有
T result(); // 回调消息的内容,相当于返回值,泛型在创建Promise时指定
Throwable cause(); // 调用fail函数返回错误是,用于保存错误信息
boolean succeeded(); // 表示已经调用complete()成功回调
boolean failed(); // 表示已经调用fail()失败回调
Handler已经说过了,是一个函数接口,回调后会自动执行(实际是Future中调用,3.8.0版本在io.vertx.core.impl.FutureImpl:126可以看到)。Handler的泛型类型是AsyncResult,表示函数接口的入参。在实现handler的时候可以直接获取Future的AsyncResult对象得到回调消息,进行处理。
Future 本身的主要函数有
static <T> Future<T> future(Handler<Promise<T>> handler){};//工厂方法。创建一个未完成的Future,并且为绑定指定Promise的handler,promise需要是未返回的。
static <T> Future<T> future(){};//工厂方法,创建一个future
static <T> Future<T> succeededFuture() {}
static <T> Future<T> succeededFuture(T result) {} //工厂方法,创建一个成功返回的future 同步返回使用
static <T> Future<T> failedFuture(Throwable t){}
static <T> Future<T> failedFuture(String failureMessage){}//工厂方法,创建一个失败返回的future 同步返回使用
boolean isComplete(); 判断是否已经提交返回
Future<T> setHandler(Handler<AsyncResult<T>> handler);// 绑定handler
void complete();
void complete(T result); //返回成功
void fail(Throwable cause);
void fail(String failureMessage); //返回失败
Future调用还有一个需要注意的点,在异步函数中,返回异步结果的时候,也就是调用complete()或fail()时,保证在setHandler()绑定处理之后。否则在提交返回时,handler为空就无法触发回调处理。
同步返回
但是有些场景下,我们可能不需要进行异步操作,直接就能同步返回future的结果。这时上面示例的代码就无法运行了。需要用到同步返回的操作方式
还是刚才的例子比如我们在调用getSomething()时传递一个int参数,如果int是0,直接返回成功,如果int不是0执行异步处理,1返回成功,否则返回失败。我们修改一下代码
private Future<String> getSomething(int i) {
if (i == 0) return Future.succeededFuture("i == 0 ,这里是直接返回的内容");
Promise<String> promise = Promise.promise();
System.out.println(LocalDateTime.now().toString() + "调用getSomething函数,查询一些东西,假设查询需要5秒");
System.out.println(LocalDateTime.now().toString() + "必须调用异步api,或者使用WorkerExecutor线程封装阻塞代码。");
vertx.setTimer(5 * 1000, id -> {
System.out.println(LocalDateTime.now().toString() + "--------------------------");
System.out.println(LocalDateTime.now().toString() + "5秒后,查询完成,返回查询结果");
if (i == 1)
promise.complete(" i == 1,这里返回成功结果");
else
promise.fail(" i == " + i + ",这里返回失败结果");
});
System.out.println(LocalDateTime.now().toString() + "返回promise,调用完成,不阻塞调用线程。");
return promise.future();
}
System.out.println("开始测试getSomething(int)");
getSomething(0).setHandler(res -> {
if (res.succeeded()) {
System.out.println("如果是调用complete方法提交,执行这里");
System.out.println("获取查询结果==>" + res.result());
} else {
System.out.println("如果是调用fail方法提交,执行这里");
System.out.println("查询错误是的返回信息==>" + res.cause().getMessage());
}
});
getSomething(1).setHandler(res -> {
if (res.succeeded()) {
System.out.println("如果是调用complete方法提交,执行这里");
System.out.println("获取查询结果==>" + res.result());
} else {
System.out.println("如果是调用fail方法提交,执行这里");
System.out.println("查询错误是的返回信息==>" + res.cause().getMessage());
}
});
getSomething(2).setHandler(res -> {
if (res.succeeded()) {
System.out.println("如果是调用complete方法提交,执行这里");
System.out.println("获取查询结果==>" + res.result());
} else {
System.out.println("如果是调用fail方法提交,执行这里");
System.out.println("查询错误是的返回信息==>" + res.cause().getMessage());
}
});
System.out.println("......");
输出结果
开始测试getSomething(int)
如果是调用complete方法提交,执行这里
获取查询结果==>i == 0 ,这里是直接返回的内容
2019-09-05T15:17:21.853280调用getSomething函数,查询一些东西,假设查询需要5秒
2019-09-05T15:17:21.853649必须调用异步api,或者使用WorkerExecutor线程封装阻塞代码。
2019-09-05T15:17:21.855346返回promise,调用完成,不阻塞调用线程。
2019-09-05T15:17:21.855623调用getSomething函数,查询一些东西,假设查询需要5秒
2019-09-05T15:17:21.855675必须调用异步api,或者使用WorkerExecutor线程封装阻塞代码。
2019-09-05T15:17:21.855741返回promise,调用完成,不阻塞调用线程。
......
2019-09-05T15:17:26.856019--------------------------
2019-09-05T15:17:26.8561395秒后,查询完成,返回查询结果
如果是调用complete方法提交,执行这里
获取查询结果==> i == 1,这里返回成功结果
2019-09-05T15:17:26.856471--------------------------
2019-09-05T15:17:26.8565615秒后,查询完成,返回查询结果
如果是调用fail方法提交,执行这里
查询错误是的返回信息==> i == 2,这里返回失败结果
从输出结果地三行可以看出,i == 0的情况下,直接执行了handler函数。因为方法提前return,后面的代码就不用管了。
彩蛋
同步返回测试代码中三个handler中的代码完全一样,可以封装简化。
新测试代码
Handler<AsyncResult<String>> handler1 = new Handler<AsyncResult<String>>() {
@Override
public void handle(AsyncResult<String> event) {
if (event.succeeded()) {
System.out.println("如果是调用complete方法提交,执行这里");
System.out.println("获取查询结果==>" + event.result());
} else {
System.out.println("如果是调用fail方法提交,执行这里");
System.out.println("查询错误是的返回信息==>" + event.cause().getMessage());
}
}
};
Handler<AsyncResult<String>> handler = res -> {
if (res.succeeded()) {
System.out.println("如果是调用complete方法提交,执行这里");
System.out.println("获取查询结果==>" + res.result());
} else {
System.out.println("如果是调用fail方法提交,执行这里");
System.out.println("查询错误是的返回信息==>" + res.cause().getMessage());
}
};
System.out.println("开始测试getSomething(int)");
getSomething(0).setHandler(handler);
getSomething(1).setHandler(handler);
getSomething(2).setHandler(handler);
System.out.println("......");
handler和handler1完全相同。handler1是jdk8以前接口的匿名实现方式。handler 使用lambda对函数接口的简化声明。代码相对简洁许多。