0
点赞
收藏
分享

微信扫一扫

SpringBoot WebFlux系列(一):使用示例

小亦同学321 2022-04-13 阅读 60
spring

Why use reactive processing?

关于SpringBoot WebFlux,Spring官网有如下解释很能说明问题:

Reactive系统具有某些特性,使其成为低延迟、高吞吐量工作负载的理想选择。Project Reactor和Spring portfolio协同工作,使开发人员能够构建响应性、弹性、弹性和消息驱动的企业级反应式系统。

Reactive processing反应式处理是一种范例,使开发人员能够构建能够处理背压(流量控制)的非阻塞、异步应用程序。反应式系统更好地利用现代处理器。 此外,在反应式编程中包含背压可确保解耦组件之间具有更好的弹性

Project Reactor

Spring WebFlux

 使用示例

pom.xml

<!-- Spring Boot版本 -->
<parent>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-parent</artifactId>
      <version>2.6.6</version>
      <relativePath/> <!-- lookup parent from repository -->
</parent>

<dependencies>
        <!-- 引入Spring Webflux -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>

        <!-- Reactive反应式数据库驱动 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-r2dbc</artifactId>
        </dependency>
        <dependency>
            <groupId>dev.miku</groupId>
            <artifactId>r2dbc-mysql</artifactId>
        </dependency>
        <!--HikariCP连接池在R2BC中不可用,连接池选择使用R2DBC Pool-->
        <dependency>
            <groupId>io.r2dbc</groupId>
            <artifactId>r2dbc-pool</artifactId>
        </dependency>

        <!--Spring Boot 2.0开始内置ReactiveStringRedisTemplate支持反应式高背压Redis访问-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>

        <dependency>
            <groupId>com.fasterxml.jackson.datatype</groupId>
            <artifactId>jackson-datatype-jsr310</artifactId>
        </dependency>

        <!--lombok-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
</dependencies>

Repository

 数据访问层通过扩展ReactiveCrudRepository接口,继承Entity在MySQL数据库上执行通用CRUD操作的方法。真香,解放了双手,写最少的代码干最多的事儿...

/**ReactiveCrudRepository注释
*Interface for generic CRUD operations on a repository for a specific type. This *repository follows reactive paradigms and uses Project Reactor types which are *built on top of Reactive Streams.
*针对特定类型的Repository上的通用CRUD操作的接口。 
*这个Repository遵循反应式式范例,并使用构建在响应式流之上的Project Reactor类型。
**/
public interface RouteRuleRepository extends ReactiveCrudRepository<RouteRule, Long> {
}
server.port=9010
logging.level.org.springframework.r2dbc=DEBUG
spring.r2dbc.url=r2dbc:mysql://root:123456@localhost:3306/test?serverZoneId=GMT%2B8&sslMode=DISABLED
spring.r2dbc.pool.enabled=true
spring.r2dbc.pool.initial-size=50
spring.r2dbc.pool.max-size=100

#Redis自动化配置类
#SpringBoot2.0之后默认使用的Lettuce客户端连接Redis服务器
#Jedis在实现上是直接连接redis server,如果在多线程环境下是非线程安全的,这个时候只有使用连接池,#为每个Jedis实例增加物理连接
#Lettuce的连接是基于Netty的,连接实例(StatefulRedisConnection)可以在多个线程间并发访问,应
#为StatefulRedisConnection是线程安全的,所以一个连接实例(StatefulRedisConnection)就可以满#足多线程环境下的并发访问,当然这个也是可伸缩的设计,一个连接实例不够的情况也可以按需增加连接实例

#默认情况下连接本地单实例Redis Server是不需要配置的
#生产环境可以通过RedisProperties配置连接哨兵或Cluster集群Redis Server

Service

业务逻辑层通过反应式组件返回Mono或Flux发射器。

Mono和Flux是Project Reactor对Reactive Stream规范中Publisher的实现,Publisher是一个可以提供 0-N 个序列元素的提供者,并根据其订阅者Subscriber的需求推送元素(高背压)。

反应式代码用更少的资源做更多的工作!Reactive Processing更好地利用现代处理器。 此外,在反应式编程中包含背压可确保解耦组件之间具有更好的弹性。 

@Service
public class RouteRuleService {

    @Autowired
    private RouteRuleRepository routeRuleRepository;

    @Autowired
    private ReactiveStringRedisTemplate reactiveStringRedisTemplate;

    public Flux<RouteRule> findAll() {
        return routeRuleRepository.findAll();
    }

    /**根据Id查询Entity
    *首先从Redis缓存中查询,如果缓存没有命中则从MySQL中查询
    **/
    public Mono<RouteRule> findById(Long id) {
        return findByIdFromRedis(id)
                .switchIfEmpty(findByIdFromDB(id));
    }

    private Mono<RouteRule> findByIdFromRedis(Long id) {
        String redisKey = RedisUtil.getKey(DataConstant.STR_ROUTE_RULE, String.valueOf(id));
        return reactiveStringRedisTemplate.opsForValue().get(redisKey)
                .switchIfEmpty(Mono.empty())
                .transform(mono -> mono.map(val -> JsonUtil.string2Obj(val, RouteRule.class)));
    }

    /**根据Id从MySQL中查询Entity
    *数据库中存在时缓存至Redis 10min 暂不考虑 缓存穿透 和 缓存雪崩
    **/
    private Mono<RouteRule> findByIdFromDB(Long id) {
        String redisKey = RedisUtil.getKey(DataConstant.STR_ROUTE_RULE, String.valueOf(id));
        return routeRuleRepository.findById(id)
                .switchIfEmpty(Mono.error(new InstanceNotFoundException("route rule not exist")))
                .doOnSuccess(routeRule -> reactiveStringRedisTemplate.opsForValue()
                        .set(redisKey, JsonUtil.obj2String(routeRule), Duration.ofMinutes(10L)).subscribe());
    }
}

Controller

Mono和Flux发射器,WebFlux的异步非阻塞任务调度器Scheduler,是Reactor的核心,后续会有专题介绍。学习Reactor最好的办法就是查看注释,Reactor的注释真的是业界良心...

@RestController
public class IndexController {

    @Autowired
    private RouteRuleService routeRuleService;

    @GetMapping
    public Flux<RouteRule> index() {
        return routeRuleService.findAll();
    }

    @GetMapping("/{id}")
    public Mono<RouteRule> get(@PathVariable(name = "id") Long id) {
        return routeRuleService.findById(id);
    }
}

项目结构

 

举报

相关推荐

0 条评论