Full Spring Reactive

背景

之前写的一些项目都是仅用了一半的Reactive写法,到数据库这边就还是传统的写法

因为现在最常用的就是 MyBatis,然而并不支持,甚至说 driver 都很难支持的全。

不过踩坑无数 终于写了个完整的Reactive应用并在生产环境发布了。

(偷偷说一句 即使是全reactive,也不会带来什么性能提升)

总结坑点

  1. mysql driver支持太差。
  2. 数据库API为: r2dbc
  3. R2dbcRepository的字段映射
  4. SQL Statement 参数绑定
  5. 模版引擎的返回处理
  6. 请求参数处理
  7. 多步聚合处理
  8. 多线程 Flux
  9. Reactive 消费
  10. doOn 系列勾子

Reactive

1. MySql Driver

驱动还是非关系性数据库比较好,mongodb 就很少出问题。

数据库驱动可以在:https://r2dbc.io 找到

mysql下有个 mirromutth/r2dbc-mysql 提供的简单版驱动,但这个实在太简单了,好多东西都没支持,例如 参数绑定 就有问题(细节到 4 再讲)。

这里需要隆重的介绍 mariadb,在mysql被oracle收购之后,作者就弄了个mysql的分支 并且宣布高度兼容mysql,这就是mariadb,在此之后 也得到了部分Linux发行版的支持,例如ubuntu就有这个源,而不提供mysql。

mariadb的driver的官方驱动已支持reactive,并且在活跃维护。

maven:

<dependency>
    <groupId>org.mariadb</groupId>
    <artifactId>r2dbc-mariadb</artifactId>
    <scope>runtime</scope>
</dependency>

2. r2dbc

注意了 我们传统都是使用 JDBC API的,但 reactive 下为 r2dbc,所以和普通配置要是有些区别的

  1. 这里不再是 spring.datasource
  2. 不再是 jdbc:mariadb://

再次注意 不要为 r2dbc:mysql://,行不通的。

spring config:

spring:
  r2dbc:
    url: r2dbc:mariadb://{your_mysql_ip}/test
    username: root
    password: root

3. R2dbcRepository 字段映射

在做CRUD的时候 可以直接用 R2dbcRepository,

但是字段映射会有些区别。

例如日期: 一般情况下 我们都是 java.util.Date

但Date不再受支持了。可以用 LocalDateTime 替代,也可以用 Instant 来替代。

当然在序列化的时候也会碰到一些问题 ,可以在中间步骤再转换为 Date。或者折腾序列化配置。

4. SQL Statement 参数绑定

在一些聚合查询/联表查询的时候 都会用到一个注解:@Query

@Query的参数绑定也会被坑到,这个和官方文档描述的并不一致(所以 Reactive 生态并不完善,上生产要慎重)。

并且驱动一定不能是 r2dbc-mysql,是不支持这样的简单参数绑定的。

e.g:

String name = "wayne%" // 入参的值可以直接包含模糊查询的表达式

@Query("select * from user where name like :name or email like :name")
Flux<User> selectUserByName(String name);

5. 模版引擎的返回处理

在 Spring MVC中 一般使用 ModelAndView 来返回视图和数据。

但reactive中没有这个API。需要用 Rendering 来替代。

Rendering.view("data/page")
    .modelAttribute("data", data)
    .build();

6. 请求参数处理

在目前版本中,入参解析只会从url中取,但一些POST请求可能会把参数放在body中,这样的话 spring不做解析。

目前只能自己手动处理。

在Controller方法参数中添加 ServerHttpRequest request

通过request来获取body内容并解析

final Flux<DataBuffer> body = request.getBody();
final String body = dataBuffer.toString(StandardCharsets.UTF_8);

Map<String, String> param = initQueryParams(body)
                                .toSingleValueMap();


//  ------------- 这两个方法其实还是从Spring那里复制来的 只不过之前是从URI里解析 现在直接从内容解析

private static final Pattern QUERY_PATTERN = Pattern.compile("([^&=]+)(=?)([^&]+)?");

private static MultiValueMap<String, String> initQueryParams(String query) {
    MultiValueMap<String, String> queryParams = new LinkedMultiValueMap<>();
    if (query != null) {
        Matcher matcher = QUERY_PATTERN.matcher(query);
        while(matcher.find()) {
            String name = Tools.decodeQueryParam(matcher.group(1));
            String eq = matcher.group(2);
            String value = matcher.group(3);
            value = value != null ? Tools.decodeQueryParam(value) : (StringUtils.hasLength(eq) ? "" : null);
            queryParams.add(name, value);
        }
    }

    return queryParams;
}

private static String decodeQueryParam(String value) {
    return URLDecoder.decode(value, StandardCharsets.UTF_8);
}

7. 多步聚合处理

一般业务下 会从一个列表中 把一个元素和相关联的元素也找出来 然后拼装后返回。

在Reactive下和以往的思考方式不同。

通过 repository 获取一个列表后,可以通过 map/flatMap 来转换到 wrap 的对象

在map方法中可以 调用多个查询 通过 zip 来组装并返回。

e.g:

// 只是伪代码

@GetMapping
Flux<UserWrap> findAllUser() {

    return userRepos.findAll()
    .flatMap(this::findWrap);
}

Mono<UserWrap> findWrap(User user) {
    Mono<Settings> settings = findUserSettings(user.getUserId);
    Mono<Privacy> privacy = findUserPrivacy(user.getUserId);

    return Mono.zip(settings, privacy)
            .map(data -> {
                return UserWrap.builder
                        .user(user)
                        .settings(data.getT1())
                        .privacy(data.getT2())
                        .build();
            })
}

8. 多线程 Flux

接着 step 7,尽管已经发挥出 reactive 的 API了。但是还是有可以优化的地方。

例如可以多线程并发查询。

在 stream API 中 可以直接 用 parallel 来处理并发流。

但 Reactive API 则提供了 更灵活的并发方式。 接着上面的例子来写。

e.g:

@GetMapping
Flux<UserWrap> findAllUser() {

    return userRepos.findAll()
    .parallel() // 并发 Flux
    .runOn(Schedulers.parallel()) // 多线程调度器
    .flatMap(this::findWrap)
    .sequential(); // 合并并发结果
}

9. Reactive 消费

其实应该放在第一点来讲的,对于刚入坑来讲是非常重要的。不过嘛 我就不按照套路来了(才不是忘记写了)

reactive 中 一个有意思的一点就是,任何程序 都必须要有消费方,否则将不会被执行.

例如:

// 传统编程 TODO (叫啥名字来着)
System.out.printf("program run %n"); // 1. 会执行

System.out.printf("%s %n", 1 / 0) // 2. 会执行并报错

// reactive
Mono.just("program run"); // 1. 不会执行

Mono.just("program run").subscribe(); // 2. 会执行

Mono.just(1 / 0); // 3. 不会执行(也不会报错)

我之前在 Demo 中 错误的使用了 block 方法。但实际上在整个链路中是不能随便的block的。

10. doOn 系列勾子

一个很简单的问题,在使用多线程下 往往想知道 这一系列的多线程操作后总花费时间是多少,或者结果是什么。或者每一个元素的每一步是什么。

这时候 doOn系列方法就很有用。

doOn系列方法 仅仅会监听 而不改变 Publisher 的状态。并且和调用位置有关联。

跟着 step 8 来做一个实例:

@GetMapping
Flux<UserWrap> findAllUser() {

    return userRepos.findAll()
    .parallel() // 并发 Flux
    .runOn(Schedulers.parallel()) // 多线程调度器
    .flatMap(this::findWrap)
    .sequential()  // 合并并发结果
    .doOnComplete(() -> {
                    System.out.printf("exec time %s ms %n", System.currentTimeMillis() - time);
                }); // 在整个 Flux 完成后被调用
}