Full Spring Reactive
背景
之前写的一些项目都是仅用了一半的Reactive写法,到数据库这边就还是传统的写法
因为现在最常用的就是 MyBatis,然而并不支持,甚至说 driver 都很难支持的全。
不过踩坑无数 终于写了个完整的Reactive应用并在生产环境发布了。
(偷偷说一句 即使是全reactive,也不会带来什么性能提升)
总结坑点
- mysql driver支持太差。
- 数据库API为: r2dbc
- R2dbcRepository的字段映射
- SQL Statement 参数绑定
- 模版引擎的返回处理
- 请求参数处理
- 多步聚合处理
- 多线程 Flux
- Reactive 消费
- doOn 系列勾子
Reactive
1. MySql Driver
驱动还是非关系性数据库比较好,mongodb 就很少出问题。
数据库驱动可以在:https://r2dbc.io 找到
mysql下有个 mirromutth/r2dbc-mysql 提供的简单版驱动,但这个实在太简单了,好多东西都没支持,例如 参数绑定 就有问题(细节到 4 再讲)。
这里需要隆重的介绍 mariadb,在mysql被oracle收购之后,作者就弄了个mysql的分支 并且宣布高度兼容mysql,这就是mariadb,在此之后 也得到了部分Linux发行版的支持,例如ubuntu就有这个源,而不提供mysql。
mariadb的driver的官方驱动已支持reactive,并且在活跃维护。
maven:
<dependency>
<groupId>org.mariadb</groupId>
<artifactId>r2dbc-mariadb</artifactId>
<scope>runtime</scope>
</dependency>
2. r2dbc
注意了 我们传统都是使用 JDBC API的,但 reactive 下为 r2dbc,所以和普通配置要是有些区别的
- 这里不再是 spring.datasource
- 不再是 jdbc:mariadb://
再次注意 不要为 r2dbc:mysql://,行不通的。
spring config:
spring:
r2dbc:
url: r2dbc:mariadb://{your_mysql_ip}/test
username: root
password: root
3. R2dbcRepository 字段映射
在做CRUD的时候 可以直接用 R2dbcRepository,
但是字段映射会有些区别。
例如日期: 一般情况下 我们都是 java.util.Date
但Date不再受支持了。可以用 LocalDateTime 替代,也可以用 Instant 来替代。
当然在序列化的时候也会碰到一些问题 ,可以在中间步骤再转换为 Date。或者折腾序列化配置。
4. SQL Statement 参数绑定
在一些聚合查询/联表查询的时候 都会用到一个注解:@Query
@Query的参数绑定也会被坑到,这个和官方文档描述的并不一致(所以 Reactive 生态并不完善,上生产要慎重)。
并且驱动一定不能是 r2dbc-mysql,是不支持这样的简单参数绑定的。
e.g:
String name = "wayne%" // 入参的值可以直接包含模糊查询的表达式
@Query("select * from user where name like :name or email like :name")
Flux<User> selectUserByName(String name);
5. 模版引擎的返回处理
在 Spring MVC中 一般使用 ModelAndView 来返回视图和数据。
但reactive中没有这个API。需要用 Rendering 来替代。
Rendering.view("data/page")
.modelAttribute("data", data)
.build();
6. 请求参数处理
在目前版本中,入参解析只会从url中取,但一些POST请求可能会把参数放在body中,这样的话 spring不做解析。
目前只能自己手动处理。
在Controller方法参数中添加 ServerHttpRequest request
通过request来获取body内容并解析
final Flux<DataBuffer> body = request.getBody();
final String body = dataBuffer.toString(StandardCharsets.UTF_8);
Map<String, String> param = initQueryParams(body)
.toSingleValueMap();
// ------------- 这两个方法其实还是从Spring那里复制来的 只不过之前是从URI里解析 现在直接从内容解析
private static final Pattern QUERY_PATTERN = Pattern.compile("([^&=]+)(=?)([^&]+)?");
private static MultiValueMap<String, String> initQueryParams(String query) {
MultiValueMap<String, String> queryParams = new LinkedMultiValueMap<>();
if (query != null) {
Matcher matcher = QUERY_PATTERN.matcher(query);
while(matcher.find()) {
String name = Tools.decodeQueryParam(matcher.group(1));
String eq = matcher.group(2);
String value = matcher.group(3);
value = value != null ? Tools.decodeQueryParam(value) : (StringUtils.hasLength(eq) ? "" : null);
queryParams.add(name, value);
}
}
return queryParams;
}
private static String decodeQueryParam(String value) {
return URLDecoder.decode(value, StandardCharsets.UTF_8);
}
7. 多步聚合处理
一般业务下 会从一个列表中 把一个元素和相关联的元素也找出来 然后拼装后返回。
在Reactive下和以往的思考方式不同。
通过 repository 获取一个列表后,可以通过 map/flatMap
来转换到 wrap 的对象
在map方法中可以 调用多个查询 通过 zip
来组装并返回。
e.g:
// 只是伪代码
@GetMapping
Flux<UserWrap> findAllUser() {
return userRepos.findAll()
.flatMap(this::findWrap);
}
Mono<UserWrap> findWrap(User user) {
Mono<Settings> settings = findUserSettings(user.getUserId);
Mono<Privacy> privacy = findUserPrivacy(user.getUserId);
return Mono.zip(settings, privacy)
.map(data -> {
return UserWrap.builder
.user(user)
.settings(data.getT1())
.privacy(data.getT2())
.build();
})
}
8. 多线程 Flux
接着 step 7,尽管已经发挥出 reactive 的 API了。但是还是有可以优化的地方。
例如可以多线程并发查询。
在 stream API 中 可以直接 用 parallel 来处理并发流。
但 Reactive API 则提供了 更灵活的并发方式。 接着上面的例子来写。
e.g:
@GetMapping
Flux<UserWrap> findAllUser() {
return userRepos.findAll()
.parallel() // 并发 Flux
.runOn(Schedulers.parallel()) // 多线程调度器
.flatMap(this::findWrap)
.sequential(); // 合并并发结果
}
9. Reactive 消费
其实应该放在第一点来讲的,对于刚入坑来讲是非常重要的。不过嘛 我就不按照套路来了(才不是忘记写了)
reactive 中 一个有意思的一点就是,任何程序 都必须要有消费方,否则将不会被执行.
例如:
// 传统编程 TODO (叫啥名字来着)
System.out.printf("program run %n"); // 1. 会执行
System.out.printf("%s %n", 1 / 0) // 2. 会执行并报错
// reactive
Mono.just("program run"); // 1. 不会执行
Mono.just("program run").subscribe(); // 2. 会执行
Mono.just(1 / 0); // 3. 不会执行(也不会报错)
我之前在 Demo 中 错误的使用了 block 方法。但实际上在整个链路中是不能随便的block的。
10. doOn 系列勾子
一个很简单的问题,在使用多线程下 往往想知道 这一系列的多线程操作后总花费时间是多少,或者结果是什么。或者每一个元素的每一步是什么。
这时候 doOn系列方法就很有用。
doOn系列方法 仅仅会监听 而不改变 Publisher 的状态。并且和调用位置有关联。
跟着 step 8 来做一个实例:
@GetMapping
Flux<UserWrap> findAllUser() {
return userRepos.findAll()
.parallel() // 并发 Flux
.runOn(Schedulers.parallel()) // 多线程调度器
.flatMap(this::findWrap)
.sequential() // 合并并发结果
.doOnComplete(() -> {
System.out.printf("exec time %s ms %n", System.currentTimeMillis() - time);
}); // 在整个 Flux 完成后被调用
}