0
点赞
收藏
分享

微信扫一扫

Mysql查询字符串中某个字符串出现的次数

minute_5 2023-05-11 阅读 121

文章目录

一、Redis下载及安装

1.1、下载

可以在Windows环境下进入Redis的下载网址:http://redis.io/download ,进行下载安装包

或者直接在Linux存在外网环境下直接下载:wget http://download.redis.io/releases/redis-5.0.3.tar.gz

当前使用版本是:redis-5.0.3.tar.gz

1.2、环境安装

# 安装gcc
yum install gcc

1.3、编译安装

把下载好的redis-5.0.3.tar.gz放在/usr/local文件夹下,并解压

tar -zxvf redis-5.0.3.tar.gz
cd redis-5.0.3

进入到解压好的redis-5.0.3目录下,进行编译与安装

make

1.4、修改配置

下面来修改部分配置,后续详细配置将会在当前系列文章中完整标出

# 后台启动
daemonize yes 
#关闭保护模式,开启的话,只有本机才可以访问redis    
protected-mode no  
# 需要注释掉bind
# bind绑定的是自己机器网卡的ip,如果有多块网卡可以配多个ip,
# 代表允许客户端通过机器的哪些网卡ip去访问,内网一般可以不配置bind,注释掉即可    
# bind 127.0.0.1

这里的bind我需要来额外说明一下:

在后期会配置主从模型、哨兵模式以及集群模式,如果不将这里的bind注释掉,那么默认使用的是127.0.0.1,就会导致外部无法访问,而只能够在当前机器上进行访问。

1.5、启动Redis

src/redis-server redis.conf

1.6、验证Redis是否启动

ps -ef | grep redis

1.7、进入到Redis客户端

src/redis-cli -h ip -p port

1.8、其它命令

1、退出redis客户端

quit/exit

2、退出Redis服务

(1)pkill redis-server
(2)kill 进程号
(3)src/redis-cli shutdown

二、Redis持久化

2.1、RDB(snapshot)

在默认情况下, Redis 将内存数据库快照保存在名字为 dump.rdb 的二进制文件中。
你可以对 Redis 进行设置, 让它在“ N 秒内数据集至少有 M 个改动”这一条件被满足时, 自动保存一次数据集。
比如说, 以下设置会让 Redis 在满足“ 60 秒内有至少有 1000 个键被改动”这一条件时, 自动保存一次数据集:

# save 60 1000    

除了上面在指定时间之内触发的操作之外,还可以手动执行命令生成RDB快照,进入redis客户端执行命令save或bgsave可以生成dump.rdb文件,每次命令执行都会将所有redis内存快照到一个新的rdb文件里,并覆盖原有rdb快照文件。

2.1.1、bgsave的写时复制(COW)机制

Redis 借助操作系统提供的写时复制技术(Copy-On-Write, COW),在生成快照的同时,依然可以正常处理写命令。简单来说,bgsave 子进程是由主线程 fork 生成的,可以共享主线程的所有内存数据。bgsave 子进程运行后,开始读取主线程的内存数据,并把它们写入 RDB 文件。此时,如果主线程对这些数据也都是读操作,那么,主线程和 bgsave 子进程相互不影响。但是,如果主线程要修改一块数据,那么,这块数据就会被复制一份,生成该数据的副本。然后,bgsave 子进程会把这个副本数据写入 RDB 文件,而在这个过程中,主线程仍然可以直接修改原来的数据。

2.1.2、save与bgsave对比

命令savebgsave
IO类型同步异步
是否阻塞redis其它命令否(在生成子进程执行调用fork函数时会有短暂阻塞)
复杂度O(n)O(n)
优点不会消耗额外内存不阻塞客户端命令
缺点阻塞客户端命令需要fork子进程,消耗内存

2.1.2、RDB配置文件

直接查看redis.conf配置文件中的配置信息

①文件名称及其目录配置

可以看下RDB中配置的文件目录以及文件名称

# The filename where to dump the DB
# rdb文件名称
dbfilename dump.rdb

# The working directory.
# dump.rdb和aop文件都在会这个目录下面
dir ./

这里我们可以自己来修改这里的持久化文件的目录

dbfilename dump.rdb
dir /usr/local/redis-5.0.3/dofile
②多久操作会持久化

可以看源文件中的注释

save 900 1
save 300 10
save 60 10000

2.2、AOF(append-only file)

快照功能并不是非常耐久(durable): 如果 Redis 因为某些原因而造成故障停机, 那么服务器将丢失最近写入、且仍未保存到快照中的那些数据。从 1.1 版本开始, Redis 增加了一种完全耐久的持久化方式: AOF 持久化,将修改的每一条指令记录进文件appendonly.aof中(先写入os cache,每隔一段时间fsync到磁盘)

比如执行命令“set lig 666”,aof文件里会记录如下数据

*3
$3
set
$5
lig
$3
666

这是一种resp协议格式数据,星号后面的数字代表命令有多少个参数,$号后面的数字代表这个参数有几个字符

注意,如果执行带过期时间的set命令,aof文件里记录的是并不是执行的原始命令,而是记录key过期的时间戳

比如执行“set tuling 888 ex 1000”,对应aof文件里记录如下

*3
$3
set
$6
tuling
$3
888
*3
$9
PEXPIREAT
$6
tuling
$13
1604249786301

2.2.1、开启AOF

你可以通过修改配置文件来打开 AOF 功能:

appendonly yes

从现在开始, 每当 Redis 执行一个改变数据集的命令时(比如 SET), 这个命令就会被追加到 AOF 文件的末尾。

这样的话, 当 Redis 重新启动时, 程序就可以通过重新执行 AOF 文件中的命令来达到重建数据集的目的。

你可以配置 Redis 多久才将数据 fsync 到磁盘一次。

有三个选项:

appendfsync always:每次有新命令追加到 AOF 文件时就执行一次 fsync ,非常慢,也非常安全。
appendfsync everysec:每秒 fsync 一次,足够快,并且在故障时只会丢失 1 秒钟的数据。
appendfsync no:从不 fsync ,将数据交给操作系统来处理。更快,也更不安全的选择。

推荐(并且也是默认)的措施为每秒 fsync 一次, 这种 fsync 策略可以兼顾速度和安全性。

2.2.2、AOF重写

OF文件里可能有太多没用指令,所以AOF会定期根据内存的最新数据生成aof文件

例如,执行了如下几条命令:

127.0.0.1:6379> incr readcount
(integer) 1
127.0.0.1:6379> incr readcount
(integer) 2
127.0.0.1:6379> incr readcount
(integer) 3
127.0.0.1:6379> incr readcount
(integer) 4
127.0.0.1:6379> incr readcount
(integer) 5

重写后AOF文件里变成

*3
$3
SET
$2
readcount
$1
5

如下两个配置可以控制AOF自动重写频率

# auto-aof-rewrite-min-size 64mb   //aof文件至少要达到64M才会自动重写,文件太小恢复速度本来就很快,重写的意义不大
# auto-aof-rewrite-percentage 100  //aof文件自上一次重写后文件大小增长了100%则再次触发重写

当然AOF还可以手动重写,进入redis客户端执行命令bgrewriteaof重写AOF

注意,AOF重写redis会fork出一个子进程去做(与bgsave命令类似),不会对redis正常命令处理有太多影响

2.3、AOF和RDB的选择

命令RDBAOF
启动优先级
体积
恢复速度
数据安全性容易丢数据根据策略决定

生产环境可以都启用,redis启动时如果既有rdb文件又有aof文件则优先选择aof文件恢复数据,因为aof一般来说数据更全一点。

2.3.1、Redis 4.0之后的混合持久化

重启 Redis 时,我们很少使用 RDB来恢复内存状态,因为会丢失大量数据。我们通常使用 AOF 日志重放,但是重放 AOF 日志性能相对 RDB来说要慢很多,这样在 Redis 实例很大的情况下,启动需要花费很长的时间。 Redis 4.0版本及其之后为了解决这个问题,带来了一个新的持久化选项——混合持久化。

通过如下配置可以开启混合持久化(必须先开启aof):

aof-use-rdb-preamble yes   

如果开启了混合持久化,AOF在重写时,不再是单纯将内存数据转换为RESP命令写入AOF文件,而是将重写这一刻之前的内存做RDB快照处理,并且将RDB快照内容和增量的AOF修改内存数据的命令存在一起,都写入新的AOF文件,新的文件一开始不叫appendonly.aof,等到重写完新的AOF文件才会进行改名,覆盖原有的AOF文件,完成新旧两个AOF文件的替换。

于是在 Redis 重启的时候,可以先加载 RDB 的内容,然后再重放增量 AOF 日志就可以完全替代之前的 AOF 全量文件重放,因此重启效率大幅得到提升。

混合持久化AOF文件结构如下

在这里插入图片描述

2.4、 Redis的四种数据备份策略

  1. 写crontab定时调度脚本,每小时都copy一份rdb或aof的备份到一个目录中去,仅仅保留最近48小时的备份
  2. 每天都保留一份当日的数据备份到一个目录中去,可以保留最近1个月的备份
  3. 每次copy备份的时候,都把太旧的备份给删了
  4. 每天晚上将当前机器上的备份复制一份到其他机器上,以防机器损坏

三、主从架构

主从架构如下所示:

在这里插入图片描述

3.1、环境搭建

我在本地虚拟机中已经安装好了redis,那么首先按照单机配置,启动6379

我在做准备工作之前,首先在redis-5.0.3目录下面新建两个目录:

  • data目录:data/6380和data/6379用来存放持久化文件
  • conf目录:6379和6380用来存放redis-6379.conf和redis-6380.conf配置文件
1、复制一份redis.conf文件

2、将相关配置修改为如下值:
port 6380
pidfile /var/run/redis_6380.pid  # 把pid进程号写入pidfile配置的文件
logfile "6380.log"
dir /usr/local/redis-5.0.3/data/6380  # 指定数据存放目录
# 需要注释掉bind
# bind 127.0.0.1(bind绑定的是自己机器网卡的ip,如果有多块网卡可以配多个ip,代表允许客户端通过机器的哪些网卡ip去访问,内网一般可以不配置bind,注释掉即可)

3、配置主从复制
replicaof 192.168.3.17 6379   # 从本机6379的redis实例复制数据,Redis 5.0之前使用slaveof
replica-read-only yes  # 配置从节点只读

4、启动从节点
redis-server redis.conf

5、连接从节点
redis-cli -p 6380

6、测试在6379实例上写数据,6380实例是否能及时同步新修改数据

7、可以自己再配置一个6381的从节点

在本机测试之后完美搭建成功,然后模拟复制搭建一个6381端口的redis。

3.2、Redis主从工作原理

如果你为master配置了一个slave,不管这个slave是否是第一次连接上Master,它都会发送一个PSYNC命令给master请求复制数据。

master收到PSYNC命令后,会在后台进行数据持久化通过bgsave生成最新的rdb快照文件,持久化期间,master会继续接收客户端的请求,它会把这些可能修改数据集的请求缓存在内存中。当持久化进行完毕以后,master会把这份rdb文件数据集发送给slave,slave会把接收到的数据进行持久化生成rdb,然后再加载到内存中。然后,master再将之前缓存在内存中的命令发送给slave。

当master与slave之间的连接由于某些原因而断开时,slave能够自动重连Master,如果master收到了多个slave并发连接请求,它只会进行一次持久化,而不是一个连接一次,然后再把这一份持久化的数据发送给多个并发连接的slave。

主从复制(全量复制)流程图

在这里插入图片描述

数据部分复制

当master和slave断开重连后,一般都会对整份数据进行复制。但从redis2.8版本开始,redis改用可以支持部分数据复制的命令PSYNC去master同步数据,slave与master能够在网络连接断开重连后只进行部分数据复制(断点续传)。

master会在其内存中创建一个复制数据用的缓存队列,缓存最近一段时间的数据,master和它所有的slave都维护了复制的数据下标offset和master的进程id,因此,当网络连接断开后,slave会请求master继续进行未完成的复制,从所记录的数据下标开始。如果master进程id变化了,或者从节点数据下标offset太旧,已经不在master的缓存队列里了,那么将会进行一次全量数据的复制。

在这里插入图片描述

注意点:如果有很多从节点,为了缓解主从复制风暴(多个从节点同时复制主节点导致主节点压力过大),可以做如下架构,让部分从节点与从节点(与主节点同步)同步数据

在这里插入图片描述

3.3、使用Jedis测试

首先新建SpringBoot项目,引入坐标依赖:

<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>2.9.0</version>
</dependency>

编写测试代码:

public class JedisSingleTest {
  public static void main(String[] args) throws IOException {

    JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
    jedisPoolConfig.setMaxTotal(20);
    jedisPoolConfig.setMaxIdle(10);
    jedisPoolConfig.setMinIdle(5);

    // timeout,这里既是连接超时又是读写超时,从Jedis 2.8开始有区分connectionTimeout和soTimeout的构造函数
    JedisPool jedisPool = new JedisPool(jedisPoolConfig, "192.168.3.17", 6379, 3000, null);

    Jedis jedis = null;
    try {
      //从redis连接池里拿出一个连接执行命令
      jedis = jedisPool.getResource();

      //******* jedis普通操作示例 ********
      System.out.println(jedis.set("single1", "liguang"));
      System.out.println(jedis.get("single1"));
      
      
          //******* 管道示例 ********
          //管道的命令执行方式:cat redis.txt | redis-cli -h 127.0.0.1 -a password - p 6379 --pipe
          Pipeline pl = jedis.pipelined();
            for (int i = 0; i < 10; i++) {
                pl.incr("pipelineKey");
                pl.set("name" + i, "name");
                //模拟管道报错
                pl.setbit("name", -1, true);
            }
            List<Object> results = pl.syncAndReturnAll();
            System.out.println(results);
      
    } catch (Exception e) {
      e.printStackTrace();
    } finally {
      //注意这里不是关闭连接,在JedisPool模式下,Jedis会被归还给资源池。
      if (jedis != null)
        jedis.close();
    }
  }
}

管道(Pipeline)

客户端可以一次性发送多个请求而不用等待服务器的响应,待所有命令都发送完后再一次性读取服务的响应,这样可以极大的降低多条命令执行的网络传输开销,管道执行多条命令的网络开销实际上只相当于一次命令执行的网络开销。需要注意到是用pipeline方式打包命令发送,redis必须在处理完所有命令前先缓存起所有命令的处理结果。打包的命令越多,缓存消耗内存也越多。所以并不是打包的命令越多越好。

pipeline中发送的每个command都会被server立即执行,如果执行失败,将会在此后的响应中得到信息;也就是pipeline并不是表达“所有command都一起成功”的语义,管道中前面命令失败,后面命令不会有影响,继续执行。

详细代码示例见下面jedis连接示例:

Pipeline pl = jedis.pipelined();
for (int i = 0; i < 10; i++) {
  pl.incr("pipelineKey");
  pl.set("name" + i, "name");
  // 模拟管道报错
  // pl.setbit("name", -1, true);
}
List<Object> results = pl.syncAndReturnAll();
System.out.println(results);

四、哨兵高可用架构

在这里插入图片描述

sentinel哨兵是特殊的redis服务,不提供读写服务,主要用来监控redis实例节点

哨兵架构下client端第一次从哨兵找出redis的主节点,后续就直接访问redis的主节点,不会每次都通过sentinel代理访问redis的主节点,当redis的主节点发生变化,哨兵会第一时间感知到,并且将新的redis主节点通知给client端(这里面redis的client端一般都实现了订阅功能,订阅sentinel发布的节点变动消息)

4.1、环境搭建

1、复制一份sentinel.conf文件
cp sentinel.conf sentinel-26379.conf

2、将相关配置修改为如下值:
port 26379
daemonize yes
pidfile "/var/run/redis-sentinel-26379.pid"
logfile "26379.log"
dir "/usr/local/redis-5.0.3/data"
# sentinel monitor <master-redis-name> <master-redis-ip> <master-redis-port> <quorum>
# quorum是一个数字,指明当有多少个sentinel认为一个master失效时(值一般为:sentinel总数/2 + 1),master才算真正失效
sentinel monitor mymaster 192.168.0.60 6379 2   # mymaster这个名字随便取,客户端访问时会用到

3、启动sentinel哨兵实例
src/redis-sentinel sentinel-26379.conf

4、查看sentinel的info信息
src/redis-cli -p 26379
127.0.0.1:26379>info
可以看到Sentinel的info里已经识别出了redis的主从

5、可以自己再配置两个sentinel,端口2638026381,注意上述配置文件里的对应数字都要修改

sentinel集群都启动完毕后,会将哨兵集群的元数据信息写入所有sentinel的配置文件里去(追加在文件的最下面),我们查看下如下配置文件sentinel-26379.conf,如下所示:

# 代表redis主节点的从节点信息
sentinel known-replica mymaster 192.168.0.60 6380 
# 代表redis主节点的从节点信息
sentinel known-replica mymaster 192.168.0.60 6381 
# 代表感知到的其它哨兵节点    
sentinel known-sentinel mymaster 192.168.0.60 26380 52d0a5d70c1f90475b4fc03b6ce7c3c56935760f  
# 代表感知到的其它哨兵节点    
sentinel known-sentinel mymaster 192.168.0.60 26381 e9f530d3882f8043f76ebb8e1686438ba8bd5ca6  

当redis主节点如果挂了,哨兵集群会重新选举出新的redis主节点,同时会修改所有sentinel节点配置文件的集群元数据信息,比如6379的redis如果挂了,假设选举出的新主节点是6380,则sentinel文件里的集群元数据信息会变成如下所示:

#代表主节点的从节点信息
sentinel known-replica mymaster 192.168.0.60 6379 
#代表主节点的从节点信息    
sentinel known-replica mymaster 192.168.0.60 6381 
#代表感知到的其它哨兵节点    
sentinel known-sentinel mymaster 192.168.0.60 26380 52d0a5d70c1f90475b4fc03b6ce7c3c56935760f
#代表感知到的其它哨兵节点    
sentinel known-sentinel mymaster 192.168.0.60 26381 e9f530d3882f8043f76ebb8e1686438ba8bd5ca6  

同时还会修改sentinel文件里之前配置的mymaster对应的6379端口,改为6380

sentinel monitor mymaster 192.168.0.60 6380 2

当6379的redis实例再次启动时,哨兵集群根据集群元数据信息就可以将6379端口的redis节点作为从节点加入集群

4.2、哨兵代码测试

public class JedisSentinelTest {
    public static void main(String[] args) throws IOException {

        JedisPoolConfig config = new JedisPoolConfig();
        config.setMaxTotal(20);
        config.setMaxIdle(10);
        config.setMinIdle(5);

        String masterName = "mymaster";
        Set<String> sentinels = new HashSet<String>();
        sentinels.add(new HostAndPort("192.168.0.60",26379).toString());
        sentinels.add(new HostAndPort("192.168.0.60",26380).toString());
        sentinels.add(new HostAndPort("192.168.0.60",26381).toString());
        //JedisSentinelPool其实本质跟JedisPool类似,都是与redis主节点建立的连接池
        //JedisSentinelPool并不是说与sentinel建立的连接池,而是通过sentinel发现redis主节点并与其建立连接
        JedisSentinelPool jedisSentinelPool = new JedisSentinelPool(masterName, sentinels, config, 3000, null);
        Jedis jedis = null;
        try {
            jedis = jedisSentinelPool.getResource();
            System.out.println(jedis.set("sentinel", "name"));
            System.out.println(jedis.get("sentinel"));
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //注意这里不是关闭连接,在JedisPool模式下,Jedis会被归还给资源池。
            if (jedis != null)
                jedis.close();
        }
    }
}

4.3、SpringBoot整合Redis哨兵架构

1、引入相关依赖

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

<dependency>
   <groupId>org.apache.commons</groupId>
   <artifactId>commons-pool2</artifactId>
</dependency>

2、springboot项目核心配置

server:
  port: 8080

spring:
  redis:
    database: 0
    timeout: 3000
    sentinel:    #哨兵模式
      master: mymaster #主服务器所在集群名称
     nodes: 192.168.0.60:26379,192.168.0.60:26380,192.168.0.60:26381
   lettuce:
      pool:
        max-idle: 50
        min-idle: 10
        max-active: 100
        max-wait: 1000

3、代码如下

@RestController
public class IndexController {

    private static final Logger logger = LoggerFactory.getLogger(IndexController.class);

    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    /**
     * 测试节点挂了哨兵重新选举新的master节点,客户端是否能动态感知到
     * 新的master选举出来后,哨兵会把消息发布出去,客户端实际上是实现了一个消息监听机制,
     * 当哨兵把新master的消息发布出去,客户端会立马感知到新master的信息,从而动态切换访问的masterip
     *
     * @throws InterruptedException
     */
    @RequestMapping("/test_sentinel")
    public void testSentinel() throws InterruptedException {
        int i = 1;
        while (true){
            try {
                stringRedisTemplate.opsForValue().set("name"+i, i+"");
                System.out.println("设置key:"+ "name" + i);
                i++;
                Thread.sleep(1000);
            }catch (Exception e){
                logger.error("错误:", e);
            }
        }
    }
}

存在问题

但是测试证明,在sentinel选择节点期间,如果还有请求进来,会丢失部分请求数据。

4.4、补充说明

StringRedisTemplate与RedisTemplate详解

spring 封装了 RedisTemplate 对象来进行对redis的各种操作,它支持所有的 redis 原生的 api。在RedisTemplate中提供了几个常用的接口方法的使用,分别是:

private ValueOperations<K, V> valueOps;
private HashOperations<K, V> hashOps;
private ListOperations<K, V> listOps;
private SetOperations<K, V> setOps;
private ZSetOperations<K, V> zSetOps;

RedisTemplate中定义了对5种数据结构操作

redisTemplate.opsForValue();//操作字符串
redisTemplate.opsForHash();//操作hash
redisTemplate.opsForList();//操作list
redisTemplate.opsForSet();//操作set
redisTemplate.opsForZSet();//操作有序set

StringRedisTemplate继承自RedisTemplate,也一样拥有上面这些操作。

StringRedisTemplate默认采用的是String的序列化策略,保存的key和value都是采用此策略序列化保存的。

RedisTemplate默认采用的是JDK的序列化策略,保存的key和value都是采用此策略序列化保存的。

所谓的JDK序列化就是将对象序列化成/xxx/yyy/ddd,通常来说,公司都会采用自定义操作。

Redis客户端命令对应的RedisTemplate中的方法列表

String类型结构
RedisRedisTemplate rt
set key valuert.opsForValue().set(“key”,“value”)
get keyrt.opsForValue().get(“key”)
del keyrt.delete(“key”)
strlen keyrt.opsForValue().size(“key”)
getset key valuert.opsForValue().getAndSet(“key”,“value”)
getrange key start endrt.opsForValue().get(“key”,start,end)
append key valuert.opsForValue().append(“key”,“value”)
Hash结构
hmset key field1 value1 field2 value2…rt.opsForHash().putAll(“key”,map) //map是一个集合对象
hset key field valuert.opsForHash().put(“key”,“field”,“value”)
hexists key fieldrt.opsForHash().hasKey(“key”,“field”)
hgetall keyrt.opsForHash().entries(“key”) //返回Map对象
hvals keyrt.opsForHash().values(“key”) //返回List对象
hkeys keyrt.opsForHash().keys(“key”) //返回List对象
hmget key field1 field2…rt.opsForHash().multiGet(“key”,keyList)
hsetnx key field valuert.opsForHash().putIfAbsent(“key”,“field”,“value”
hdel key field1 field2rt.opsForHash().delete(“key”,“field1”,“field2”)
hget key fieldrt.opsForHash().get(“key”,“field”)
List结构
lpush list node1 node2 node3…rt.opsForList().leftPush(“list”,“node”)
rt.opsForList().leftPushAll(“list”,list) //list是集合对象
rpush list node1 node2 node3…rt.opsForList().rightPush(“list”,“node”)
rt.opsForList().rightPushAll(“list”,list) //list是集合对象
lindex key indexrt.opsForList().index(“list”, index)
llen keyrt.opsForList().size(“key”)
lpop keyrt.opsForList().leftPop(“key”)
rpop keyrt.opsForList().rightPop(“key”)
lpushx list nodert.opsForList().leftPushIfPresent(“list”,“node”)
rpushx list nodert.opsForList().rightPushIfPresent(“list”,“node”)
lrange list start endrt.opsForList().range(“list”,start,end)
lrem list count valuert.opsForList().remove(“list”,count,“value”)
lset key index valuert.opsForList().set(“list”,index,“value”)
Set结构
sadd key member1 member2…rt.boundSetOps(“key”).add(“member1”,“member2”,…)
rt.opsForSet().add(“key”, set) //set是一个集合对象
scard keyrt.opsForSet().size(“key”)
sidff key1 key2rt.opsForSet().difference(“key1”,“key2”) //返回一个集合对象
sinter key1 key2rt.opsForSet().intersect(“key1”,“key2”)//同上
sunion key1 key2rt.opsForSet().union(“key1”,“key2”)//同上
sdiffstore des key1 key2rt.opsForSet().differenceAndStore(“key1”,“key2”,“des”)
sinter des key1 key2rt.opsForSet().intersectAndStore(“key1”,“key2”,“des”)
sunionstore des key1 key2rt.opsForSet().unionAndStore(“key1”,“key2”,“des”)
sismember key memberrt.opsForSet().isMember(“key”,“member”)
smembers keyrt.opsForSet().members(“key”)
spop keyrt.opsForSet().pop(“key”)
srandmember key countrt.opsForSet().randomMember(“key”,count)
srem key member1 member2…rt.opsForSet().remove(“key”,“member1”,“member2”,…)

封装RedisTemplate工具类

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.jsontype.impl.LaissezFaireSubTypeValidator;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer;

@Configuration
public class RedisConfig {

    @Bean
    public RedisTemplate<Object,Object> redisTemplate(RedisConnectionFactory redisConnectionFactory){
        RedisTemplate<Object,Object> template = new RedisTemplate<>();
        template.setConnectionFactory(redisConnectionFactory);

        Jackson2JsonRedisSerializer<Object> serializer = new Jackson2JsonRedisSerializer<>(Object.class);
        serializer.setObjectMapper(serializingObjectMapper());
        // value值的序列化采用jacksonRedisSerializer
        template.setValueSerializer(serializer);
        template.setHashValueSerializer(serializer);
        // key的序列化采用StringRedisSerializer
        template.setKeySerializer(new StringRedisSerializer());
        template.setHashKeySerializer(new StringRedisSerializer());

        template.setConnectionFactory(redisConnectionFactory);
        return template;
    }

    private ObjectMapper serializingObjectMapper(){
        //创建jsr310包下的javaTimeModule对象,其中包含了jdk8以后的时间类型序列化和反序列化配置
        JavaTimeModule javaTimeModule = new JavaTimeModule();
        //LocalDateTime序列化设置为yyyy-MM-dd HH:mm:ss
        LocalDateTimeSerializer localDateTimeSerializer =
        new LocalDateTimeSerializer(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        //增加序列化到配置当中
        javaTimeModule.addSerializer(LocalDateTime.class,localDateTimeSerializer);
        //LocalDateTime反序列化设置为yyyy-MM-dd HH:mm:ss
        LocalDateTimeDeserializer localDateTimeDeserializer =
        new LocalDateTimeDeserializer(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        //增加反序列化到配置当中
        javaTimeModule.addDeserializer(LocalDateTime.class,localDateTimeDeserializer);
        ObjectMapper objectMapper = new ObjectMapper();
        		       objectMapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance,ObjectMapper.DefaultTyping.NON_FINAL,JsonTypeInfo.As.PROPERTY);
        objectMapper.setVisibility(PropertyAccessor.ALL,JsonAutoDetect.Visibility.ANY);
        objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
        //注册配置
        objectMapper.registerModule(javaTimeModule);
        return objectMapper;
    }

}

然后将RedisTemplate分装成工具类

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import lombok.AllArgsConstructor;

/**
 * @Description redis工具类
 * @Author liguang
 * @Date 2021/4/12 18:07
 */
@Component
@AllArgsConstructor
public class RedisService<K,V> {

    private final RedisTemplate<K,V> redisTemplate;

    /**
     * 添加、修改缓存
     * @param key   key
     * @param value value
     * @return boolean
     * @author liguang
     * @date 2019/11/1 19:50
     **/
    public boolean set(K key,V value){
        redisTemplate.opsForValue().set(key,value);
        return true;
    }

    /**
     * 添加、修改缓存并且设置过期时间--原子性操作,但是只有setEX操作,不包含setNX
     * @param key      key
     * @param value    value
     * @param expire   过期时间
     * @param timeUnit 时间单位
     * @return boolean
     * @author liguang
     * @date 2019/11/1 19:50
     **/
    public boolean set(K key,V value,long expire,TimeUnit timeUnit){
        redisTemplate.opsForValue().set(key,value,expire,timeUnit);
        return true;
    }

    /**
     * 添加、修改缓存并且设置过期时间--时间单位为:秒
     * @param key    key
     * @param value  value
     * @param expire 过期时间
     * @return boolean
     * @author liguang
     * @date 2019/11/1 19:50
     **/
    public boolean set(K key,V value,long expire){
        return set(key,value,expire,TimeUnit.SECONDS);
    }

    /**
     * 添加、修改缓存并且设置过期时间--原子性操作用于分布式加锁操作等。这里包含了sexNX和setEX两个操作
     * @param key      key
     * @param value    value
     * @param expire   过期时间
     * @param timeUnit 时间单位
     * @return boolean
     * @author liguang
     * @date 2019/11/1 19:50
     **/
    public boolean setIfAbsent(K key,V value,long expire,TimeUnit timeUnit){
        return Optional.ofNullable(redisTemplate.opsForValue().setIfAbsent(key,value,expire,timeUnit)).orElse(false);
    }

    /**
     * 获取缓存信息
     * @param key key
     * @return V
     * @author liguang
     * @date 2019/11/1 19:50
     **/
    public V get(K key){
        return redisTemplate.opsForValue().get(key);
    }

    /**
     * 对key添加过期时间
     * @param key      key
     * @param expire   过期时间
     * @param timeUnit 时间单位
     * @return boolean
     * @author liguang
     * @date 2019/11/1 19:50
     **/
    public boolean expire(K key,long expire,TimeUnit timeUnit){
        return Optional.ofNullable(redisTemplate.expire(key,expire,timeUnit)).orElse(false);
    }

    /**
     * 获取key的过期时间 默认单位:秒。-2表示不存在,-1表示永久,正数为正常时间。如果获取失败默认返回-2
     * @param key key
     * @return Long
     * @author liguang
     * @date 2019/11/1 19:50
     **/
    public Long getExpire(K key){
        return redisTemplate.getExpire(key);
    }

    /**
     * 根据时间单位,获取key的过期时间 获取key的过期时间。 -2表示不存在,-1表示永久,正数为正常时间。如果获取失败默认返回-2
     * @param key key
     * @return Long
     * @author liguang
     * @date 2019/11/1 19:50
     **/
    public Long getExpire(K key,TimeUnit timeUnit){
        return redisTemplate.getExpire(key,timeUnit);
    }

    /**
     * 删除缓存信息
     * @param key key
     * @return boolean
     * @author liguang
     * @date 2019/11/1 19:50
     **/
    public boolean delete(K key){
        return Optional.ofNullable(redisTemplate.delete(key)).orElse(false);
    }

    /**
     * 将 key 中储存的数字值增一。 如果 key 不存在,那么 key 的值会先被初始化为 0 ,然后再执行 INCR 操作。
     * <p>
     * 如果值包含错误的类型,或字符串类型的值不能表示为数字,那么返回一个错误。
     * @param key key
     * @return Long
     * @author liguang
     * @date 2019/11/1 19:50
     **/
    public Long increment(K key){
        return redisTemplate.opsForValue().increment(key);
    }

    /**
     * 将 key 中储存的数字值加上增量
     * @param key       key
     * @param increment 增量
     * @return Long
     * @author liguang
     * @date 2019/11/1 19:50
     **/
    public Long increment(K key,long increment){
        return redisTemplate.opsForValue().increment(key,increment);
    }

    /**
     * 将 key 中储存的数字值加上增量
     * @param key       key
     * @param increment 增量
     * @return Double
     * @author liguang
     * @date 2019/11/1 19:50
     **/
    public Double increment(K key,double increment){
        return redisTemplate.opsForValue().increment(key,increment);
    }

    /**
     * 将 key 中储存的数字值减一。 如果 key 不存在,那么 key 的值会先被初始化为 0 ,然后再执行 INCR 操作。
     * <p>
     * 如果值包含错误的类型,或字符串类型的值不能表示为数字,那么返回一个错误。
     * @param key key
     * @return Long
     * @author liguang
     * @date 2019/11/1 19:50
     **/
    public Long decrement(K key){
        return redisTemplate.opsForValue().decrement(key);
    }

    /**
     * 将 key 中储存的数字值减去指定数值
     * @param key       key
     * @param increment 指定数值-减量
     * @return Long
     * @author liguang
     * @date 2019/11/1 19:50
     **/
    public Long decrement(K key,long increment){
        return redisTemplate.opsForValue().decrement(key,increment);
    }

    /**
     * 设置hash存储,包含redis key对应一个hash桶。默认value为key
     * @param key     redis key
     * @param hashKey hashKey
     * @return boolean
     * @author liguang
     * @date 2021/4/20 12:35
     **/
    public boolean setForHash(K key,Object hashKey){
        redisTemplate.opsForHash().put(key,hashKey,hashKey);
        return true;
    }

    /**
     * 设置hash存储,包含redis key对应一个hash桶
     * @param key     redis key
     * @param hashKey hashKey
     * @param value   value
     * @return boolean
     * @author liguang
     * @date 2021/4/20 12:35
     **/
    public boolean setForHash(K key,Object hashKey,Object value){
        redisTemplate.opsForHash().put(key,hashKey,value);
        return true;
    }

    /**
     * 根据redis key和hash key获取值
     * @param key     redis key
     * @param hashKey hashKey
     * @return java.lang.Object
     * @author liguang
     * @date 2021/4/20 12:34
     **/
    public Object getForHash(K key,Object hashKey){
        return redisTemplate.opsForHash().get(key,hashKey);
    }

    /**
     * 根据redis key删除指定hashkey的值,可以为多个
     * @param key     redis key
     * @param hashKey hashKey
     * @return java.lang.long
     * @author liguang
     * @date 2021/4/20 12:34
     **/
    public long deleteForHash(K key,Object... hashKey){
        return redisTemplate.opsForHash().delete(key,hashKey);
    }

    /**
     * 根据redis key获取hash所有值
     * @param key key
     * @return java.util.Map<java.lang.Object,java.lang.Object>
     * @author liguang
     * @date 2021/4/20 12:33
     **/
    public Map<Object,Object> getForHashEntries(K key){
        return redisTemplate.opsForHash().entries(key);
    }

    /**
     * 设置set存储,支持设置多个value的传入
     * @param key    key
     * @param values value
     * @return long
     * @author liguang
     * @date 2021/4/20 12:35
     **/
    public long setForSet(K key,V... values){
        return Optional.ofNullable(redisTemplate.opsForSet().add(key,values)).orElse(0L);
    }

    /**
     * 根据redis key和判断value是否存在set集合中
     * @param key   redis key
     * @param value value
     * @return java.lang.boolean
     * @author liguang
     * @date 2021/4/20 12:34
     **/
    public boolean checkInSet(K key,Object value){
        return Optional.ofNullable(redisTemplate.opsForSet().isMember(key,value)).orElse(false);
    }

    /**
     * 根据key获取set集合全部内容
     * @param key key
     * @return java.util.Set<V>
     * @author liguang
     * @date 2021/5/17 11:55
     **/
    public Set<V> getForSet(K key){
        return redisTemplate.opsForSet().members(key);
    }

    /**
     * 根据key获取集合数据条数
     * @param key key
     * @return long
     * @author liguang
     * @date 2021/5/17 11:56
     **/
    public long getSizeForSet(K key){
        return Optional.ofNullable(redisTemplate.opsForSet().size(key)).orElse(0L);
    }

    /**
     * 根据redis key删除set集合指定元素,value可以为多个
     * @param key    key
     * @param values values
     * @return long
     * @author liguang
     * @date 2021/4/20 12:33
     **/
    public long removeFromSet(K key,Object... values){
        return Optional.ofNullable(redisTemplate.opsForSet().remove(key,values)).orElse(0L);
    }

    /**
     * 根据key获取list集合里的大小
     * @param key key
     * @return java.lang.Long
     * @author liguang
     * @date 2021/4/21 10:54
     **/
    public Long getListSize(K key){
        return redisTemplate.opsForList().size(key);
    }

    /**
     * 在集合的最左边(前面)插入数据,并且返回当前集合的size,插入失败返回-1
     * @param key   key
     * @param value value
     * @return java.lang.Long
     * @author liguang
     * @date 2021/4/21 11:13
     **/
    public Long leftPush(K key,V value){
        return redisTemplate.opsForList().leftPush(key,value);
    }

    /**
     * 在某个值的左边(前面)插入数据,把value值放到集合中指定的pivot的前面,如果pivot存在的话。如果不存在则插入失败,并且返回-1
     * @param key   key
     * @param pivot 参考值
     * @param value value
     * @return Long 当前集合size
     * @author liguang
     * @date 2021/4/21 10:59
     **/
    public Long leftPush(K key,V pivot,V value){
        return redisTemplate.opsForList().leftPush(key,pivot,value);
    }

    /**
     * 在集合的最左边(前面)插入数据,注意集合的顺序,index在前面的先插入,也就是说如果集合是[1,2,3],插入以后变成[3,2,1]
     * <p>
     * 返回当前集合的size,-1表示插入失败
     * @param key    key
     * @param values values
     * @return java.lang.Long 当前集合的size,-1表示插入失败
     * @author liguang
     * @date 2021/4/21 11:13
     **/
    public Long leftPushAll(K key,V... values){
        return redisTemplate.opsForList().leftPushAll(key,values);
    }

    /**
     * 在集合的最左边(前面)插入数据,注意集合的顺序,index在前面的先插入,也就是说如果集合是[1,2,3],插入以后变成[3,2,1]
     * <p>
     * 返回当前集合的size,-1表示插入失败
     * @param key    key
     * @param values values--集合数据
     * @return java.lang.Long
     * @author liguang
     * @date 2021/4/21 11:13
     **/
    public Long leftPushAll(K key,Collection<V> values){
        return redisTemplate.opsForList().leftPushAll(key,values);
    }

    /**
     * 在集合的最右边(后面)插入数据,并且返回当前集合的size,插入失败返回-1
     * @param key   key
     * @param value value
     * @return java.lang.Long
     * @author liguang
     * @date 2021/4/21 11:13
     **/
    public Long rightPush(K key,V value){
        return redisTemplate.opsForList().rightPush(key,value);
    }

    /**
     * 在某个值的最右边(后面)插入数据,把value值放到集合中指定的pivot的后面,如果pivot存在的话。如果不存在则插入失败,并且返回-1
     * @param key   key
     * @param pivot 参考值
     * @param value value
     * @return Long 当前集合size
     * @author liguang
     * @date 2021/4/21 10:59
     **/
    public Long rightPush(K key,V pivot,V value){
        return redisTemplate.opsForList().leftPush(key,pivot,value);
    }

    /**
     * 在集合的最右边(后面)插入数据,按顺序插入
     * <p>
     * 返回当前集合的size,-1表示插入失败
     * @param key    key
     * @param values values
     * @return java.lang.Long 当前集合的size,-1表示插入失败
     * @author liguang
     * @date 2021/4/21 11:13
     **/
    public Long rightPushAll(K key,V... values){
        return redisTemplate.opsForList().rightPushAll(key,values);
    }

    /**
     * 在集合的最右边(后面)插入数据,按顺序插入
     * <p>
     * 返回当前集合的size,-1表示插入失败
     * @param key    key
     * @param values values--集合数据
     * @return java.lang.Long
     * @author liguang
     * @date 2021/4/21 11:13
     **/
    public Long rightPushAll(K key,Collection<V> values){
        return redisTemplate.opsForList().rightPushAll(key,values);
    }

    /**
     * 从集合的最左边(前面)当中取出元素
     * @param key key
     * @return V
     * @author liguang
     * @date 2021/4/21 12:29
     **/
    public V leftPop(K key){
        return redisTemplate.opsForList().leftPop(key);
    }

    /**
     * 从集合的最右边(后面)当中取出元素
     * @param key key
     * @return V
     * @author liguang
     * @date 2021/4/21 12:29
     **/
    public V rightPop(K key){
        return redisTemplate.opsForList().rightPop(key);
    }

    /**
     * 根据key获取集合数据--因为先查找范围才返回数据,是两个操作,不能保证数据完全准确性,如需要保证数据完整需要通过lua脚本
     * @param key key
     * @return java.util.List<V>
     * @author liguang
     * @date 2021/4/21 11:28
     **/
    public List<V> getForList(K key){
        Long size = getListSize(key);
        return size == null || size == 0 ? new ArrayList<>() : getForList(key,0,size);
    }

    /**
     * 根据key,获取list下标范围集合的数据
     * @param key   key
     * @param start 开始下标
     * @param end   结束下标
     * @return java.util.List<V>
     * @author liguang
     * @date 2021/4/21 11:29
     **/
    public List<V> getForList(K key,long start,long end){
        return redisTemplate.opsForList().range(key,start,end);
    }

    /**
     * 根据key获取指定集合下标数据,注意:如果不存在会返回null
     * @param key   key
     * @param index 下标
     * @return V
     * @author liguang
     * @date 2021/4/21 11:30
     **/
    public V index(K key,long index){
        return redisTemplate.opsForList().index(key,index);
    }
}

五、集群架构

举报

相关推荐

0 条评论