Why use reactive processing?
关于SpringBoot WebFlux,Spring官网有如下解释很能说明问题:
Reactive系统具有某些特性,使其成为低延迟、高吞吐量工作负载的理想选择。Project Reactor和Spring portfolio协同工作,使开发人员能够构建响应性、弹性、弹性和消息驱动的企业级反应式系统。
Reactive processing反应式处理是一种范例,使开发人员能够构建能够处理背压(流量控制)的非阻塞、异步应用程序。反应式系统更好地利用现代处理器。 此外,在反应式编程中包含背压可确保解耦组件之间具有更好的弹性。
Project Reactor
Spring WebFlux
使用示例
pom.xml
<!-- Spring Boot版本 -->
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.6</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<dependencies>
<!-- 引入Spring Webflux -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<!-- Reactive反应式数据库驱动 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>
<dependency>
<groupId>dev.miku</groupId>
<artifactId>r2dbc-mysql</artifactId>
</dependency>
<!--HikariCP连接池在R2BC中不可用,连接池选择使用R2DBC Pool-->
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-pool</artifactId>
</dependency>
<!--Spring Boot 2.0开始内置ReactiveStringRedisTemplate支持反应式高背压Redis访问-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
</dependency>
<!--lombok-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
Repository
数据访问层通过扩展ReactiveCrudRepository接口,继承Entity在MySQL数据库上执行通用CRUD操作的方法。真香,解放了双手,写最少的代码干最多的事儿...
/**ReactiveCrudRepository注释
*Interface for generic CRUD operations on a repository for a specific type. This *repository follows reactive paradigms and uses Project Reactor types which are *built on top of Reactive Streams.
*针对特定类型的Repository上的通用CRUD操作的接口。
*这个Repository遵循反应式式范例,并使用构建在响应式流之上的Project Reactor类型。
**/
public interface RouteRuleRepository extends ReactiveCrudRepository<RouteRule, Long> {
}
server.port=9010
logging.level.org.springframework.r2dbc=DEBUG
spring.r2dbc.url=r2dbc:mysql://root:123456@localhost:3306/test?serverZoneId=GMT%2B8&sslMode=DISABLED
spring.r2dbc.pool.enabled=true
spring.r2dbc.pool.initial-size=50
spring.r2dbc.pool.max-size=100
#Redis自动化配置类
#SpringBoot2.0之后默认使用的Lettuce客户端连接Redis服务器
#Jedis在实现上是直接连接redis server,如果在多线程环境下是非线程安全的,这个时候只有使用连接池,#为每个Jedis实例增加物理连接
#Lettuce的连接是基于Netty的,连接实例(StatefulRedisConnection)可以在多个线程间并发访问,应
#为StatefulRedisConnection是线程安全的,所以一个连接实例(StatefulRedisConnection)就可以满#足多线程环境下的并发访问,当然这个也是可伸缩的设计,一个连接实例不够的情况也可以按需增加连接实例
#默认情况下连接本地单实例Redis Server是不需要配置的
#生产环境可以通过RedisProperties配置连接哨兵或Cluster集群Redis Server
Service
业务逻辑层通过反应式组件返回Mono或Flux发射器。
Mono和Flux是Project Reactor对Reactive Stream规范中Publisher的实现,Publisher是一个可以提供 0-N 个序列元素的提供者,并根据其订阅者Subscriber的需求推送元素(高背压)。
反应式代码用更少的资源做更多的工作!Reactive Processing更好地利用现代处理器。 此外,在反应式编程中包含背压可确保解耦组件之间具有更好的弹性。
@Service
public class RouteRuleService {
@Autowired
private RouteRuleRepository routeRuleRepository;
@Autowired
private ReactiveStringRedisTemplate reactiveStringRedisTemplate;
public Flux<RouteRule> findAll() {
return routeRuleRepository.findAll();
}
/**根据Id查询Entity
*首先从Redis缓存中查询,如果缓存没有命中则从MySQL中查询
**/
public Mono<RouteRule> findById(Long id) {
return findByIdFromRedis(id)
.switchIfEmpty(findByIdFromDB(id));
}
private Mono<RouteRule> findByIdFromRedis(Long id) {
String redisKey = RedisUtil.getKey(DataConstant.STR_ROUTE_RULE, String.valueOf(id));
return reactiveStringRedisTemplate.opsForValue().get(redisKey)
.switchIfEmpty(Mono.empty())
.transform(mono -> mono.map(val -> JsonUtil.string2Obj(val, RouteRule.class)));
}
/**根据Id从MySQL中查询Entity
*数据库中存在时缓存至Redis 10min 暂不考虑 缓存穿透 和 缓存雪崩
**/
private Mono<RouteRule> findByIdFromDB(Long id) {
String redisKey = RedisUtil.getKey(DataConstant.STR_ROUTE_RULE, String.valueOf(id));
return routeRuleRepository.findById(id)
.switchIfEmpty(Mono.error(new InstanceNotFoundException("route rule not exist")))
.doOnSuccess(routeRule -> reactiveStringRedisTemplate.opsForValue()
.set(redisKey, JsonUtil.obj2String(routeRule), Duration.ofMinutes(10L)).subscribe());
}
}
Controller
Mono和Flux发射器,WebFlux的异步非阻塞任务调度器Scheduler,是Reactor的核心,后续会有专题介绍。学习Reactor最好的办法就是查看注释,Reactor的注释真的是业界良心...
@RestController
public class IndexController {
@Autowired
private RouteRuleService routeRuleService;
@GetMapping
public Flux<RouteRule> index() {
return routeRuleService.findAll();
}
@GetMapping("/{id}")
public Mono<RouteRule> get(@PathVariable(name = "id") Long id) {
return routeRuleService.findById(id);
}
}
项目结构