0
点赞
收藏
分享

微信扫一扫

redis的发布/订阅(命令、普通工程、springboot实现)


目录

  • ​​一、背景介绍​​
  • ​​二、思路&方案​​
  • ​​三、过程​​
  • ​​redis发布/订阅官方讲解​​
  • ​​命令行实现​​
  • ​​普通工程实现​​
  • ​​springboot工程集成实现​​
  • ​​springboot工程集成改造实现动态点对点、广播​​
  • ​​四、总结​​
  • ​​五、升华​​

一、背景介绍

公司最近有业务需求如下:

  • 1.小美老师给五年级三班上数学课的时候,实现给所在班级进行实时推送数学课程的活动(广播通信)
  • 2.小明在上课的时候给小红的评论进行了点赞,此时小红会收到小明给你点赞这样的通知(点对点通信)

二、思路&方案

  • 基于背景中的需求,我想到了redis的发布/订阅模式;后端向前端发消息使用websocket建立长链接就好
  • 这里只介绍redis的发布/订阅模式实现
  • 通过命令、普通工程集成、springboot实现三种方式进行实现理解

三、过程

redis发布/订阅官方讲解

Redis 发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息。

Redis 客户端可以订阅任意数量的频道。

下图展示了频道 channel1 , 以及订阅这个频道的三个客户端 —— client2 、 client5 和 client1 之间的关系:

redis的发布/订阅(命令、普通工程、springboot实现)_spring

当有新消息通过 PUBLISH 命令发送给频道 channel1 时, 这个消息就会被发送给订阅它的三个客户端:

redis的发布/订阅(命令、普通工程、springboot实现)_spring boot_02

命令行实现

1.以下实例演示了发布订阅是如何工作的。在我们实例中我们创建了订阅频道名为 redisChat:

redis 127.0.0.1:6379> SUBSCRIBE redisChat

Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "redisChat"
3) (integer) 1

2.现在,我们先重新开启个 redis 客户端,然后在同一个频道 redisChat 发布两次消息,订阅者就能接收到消息。

redis 127.0.0.1:6379> PUBLISH redisChat "Redis is a great caching technique"

(integer) 1

redis 127.0.0.1:6379> PUBLISH redisChat "Learn redis by w3cschool.cc"

(integer) 1

# 订阅者的客户端会显示如下消息
1) "message"
2) "redisChat"
3) "Redis is a great caching technique"
1) "message"
2) "redisChat"
3) "Learn redis by w3cschool.cc"

普通工程实现

1.pom文件引入redis包

<!--        redis的发布订阅-->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>

2.发布者类Publisher

package com.b0022redis发布订阅;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

/**
* 发布者类
*/
public class Publisher extends Thread{

private final JedisPool jedisPool;

public Publisher(JedisPool jedisPool) {
this.jedisPool = jedisPool;
}

@Override
public void run() {
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
Jedis jedis = jedisPool.getResource(); //连接池中取出一个连接
while (true) {
String line = null;
try {
line = reader.readLine();
if (!"quit".equals(line)) {
jedis.publish("mychannel", line); //从 mychannel 的频道上推送消息
} else {
break;
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}

3.订阅者类SubThread

package com.b0022redis发布订阅;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;

/**
* 订阅者类
*/
public class SubThread extends Thread {

private final JedisPool jedisPool;
private final Subscriber subscriber = new Subscriber();

private final String channel = "mychannel";

public SubThread(JedisPool jedisPool) {
super("SubThread");
this.jedisPool = jedisPool;
}

@Override
public void run() {
System.out.println(String.format("subscribe redis, channel %s, thread will be blocked", channel));
Jedis jedis = null;
try {
jedis = jedisPool.getResource(); //取出一个连接
jedis.subscribe(subscriber, channel); //通过subscribe 的api去订阅,入参是订阅者和频道名
} catch (Exception e) {
System.out.println(String.format("subsrcibe channel error, %s", e));
} finally {
if (jedis != null) {
jedis.close();
}
}
}
}

4.消息监听回调类Subscriber

package com.b0022redis发布订阅;

import redis.clients.jedis.JedisPubSub;

/**
* redis消息监听回调类
*/
public class Subscriber extends JedisPubSub {

public Subscriber(){}
@Override
public void onMessage(String channel, String message) { //收到消息会调用
System.out.println(String.format("receive redis published message, channel %s, message %s", channel, message));
}
@Override
public void onSubscribe(String channel, int subscribedChannels) { //订阅了频道会调用
System.out.println(String.format("subscribe redis channel success, channel %s, subscribedChannels %d",
channel, subscribedChannels));
}
@Override
public void onUnsubscribe(String channel, int subscribedChannels) { //取消订阅 会调用
System.out.println(String.format("unsubscribe redis channel, channel %s, subscribedChannels %d",
channel, subscribedChannels));

}
}

5.客户端类Client

package com.b0022redis发布订阅;

import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

/**
* redis 发布订阅java版本,目前是一个订阅,一个发布
*
*/
public class Client {
public static void main( String[] args )
{
// 连接redis服务端
JedisPool jedisPool = new JedisPool(new JedisPoolConfig(), "127.0.0.1", 6379);

System.out.println(String.format("redis pool is starting, redis ip %s, redis port %d", "127.0.0.1", 6379));

SubThread subThread = new SubThread(jedisPool); //订阅者
subThread.start();

Publisher publisher = new Publisher(jedisPool); //发布者
publisher.start();
}
}

springboot工程集成实现

1.pom文件引入的包

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<!-- <scope>compile</scope>-->
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
</dependencies>

2.监听实现类CatListener

package com.mark;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.stereotype.Component;

/**
* 监听发送的消息
*/
@Component
public class CatListener extends MessageListenerAdapter implements MessageListener {

@Autowired
RedisTemplate redisTemplate;

@Override
public void onMessage(Message message, byte[] bytes) {
System.out.println("我是Cat监听" + message.toString());
}
}

3.redis消息配置,添加监听类RedisMessageConfig

package com.mark;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;


@Configuration
public class RedisMessageConfig {


/**
* redis消息监听器容器
* 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器
* 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理
* @param connectionFactory
* @param
* @return
*/
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
CatListener catAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
//订阅了一个叫chat 的通道
container.addMessageListener(catAdapter, new PatternTopic("cat"));
//这个container 可以添加多个 messageListener
return container;
}

/**
* redis 读取内容的template
*/
@Bean
StringRedisTemplate stringRedisTemplate(RedisConnectionFactory connectionFactory) {
return new StringRedisTemplate(connectionFactory);
}

@Bean
public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory factory) {
StringRedisTemplate template = new StringRedisTemplate(factory);

//定义value的序列化方式
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);


template.setValueSerializer(jackson2JsonRedisSerializer);
template.setHashKeySerializer(jackson2JsonRedisSerializer);
template.afterPropertiesSet();
return template;
}
}

4.发送者类TestController

package com.mark;

import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

@RestController
public class TestController {

@Resource
StringRedisTemplate stringRedisTemplate;

@Resource
RedisMessageListenerContainer container;

@GetMapping("cat")
public void sendCatMessage() {
stringRedisTemplate.convertAndSend("cat", "猫");
}

}

5.配置文件application.yml

server:
port: 8080
spring:
redis:
host: 127.0.0.1
database: 12
password:
port: 6379

6.启动类Client

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;


@SpringBootApplication
@ComponentScan("com.mark")
public class Client {
public static void main(String[] args) {
SpringApplication.run(Client.class, args);
}

}

springboot工程集成改造实现动态点对点、广播

1.pom文件引入的包

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
</dependencies>

2.监听实现类StudentListener

package com.mark;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.stereotype.Component;

/**
* 监听发送的消息
*/
@Component
public class StudentListener extends MessageListenerAdapter implements MessageListener {

private String id;
private String name;

StudentListener(String id,String name){
this.id = id;
this.name = name;
}
StudentListener(){
}

@Override
public void onMessage(Message message, byte[] bytes) {
System.out.println("我是监听者"+this.name+",我的id是:"+this.id+";我收到的消息是:" + message.toString());
}

}

3.redis消息配置,添加监听类RedisMessageConfig

package com.mark;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;


@Configuration
public class RedisMessageConfig {


/**
* redis消息监听器容器
* 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器
* 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理
* @param connectionFactory
* @param
* @return
*/
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
//这个container 可以添加多个 messageListener
return container;
}

/**
* redis 读取内容的template
*/
@Bean
StringRedisTemplate stringRedisTemplate(RedisConnectionFactory connectionFactory) {
return new StringRedisTemplate(connectionFactory);
}

@Bean
public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory factory) {
StringRedisTemplate template = new StringRedisTemplate(factory);

//定义value的序列化方式
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);


template.setValueSerializer(jackson2JsonRedisSerializer);
template.setHashKeySerializer(jackson2JsonRedisSerializer);
template.afterPropertiesSet();
return template;
}
}

4.添加监听、移除监听、发送消息类TestController

package com.mark;

import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.util.concurrent.ConcurrentHashMap;

@RestController
public class TestController {

ConcurrentHashMap<String,StudentListener> map = new ConcurrentHashMap<>();

@Resource
StringRedisTemplate stringRedisTemplate;

@Resource
RedisMessageListenerContainer container;

@GetMapping("pushMany")
public String pushMany(@RequestParam(value="courseId") String courseId,
@RequestParam(value="classId") String classId,
@RequestParam(value="message") String message) {
stringRedisTemplate.convertAndSend(courseId+"/"+classId, message);
return "广播发送成功";
}

@GetMapping("pushOne")
public String pushOne(@RequestParam(value="id") String id,
@RequestParam(value="message") String message) {
stringRedisTemplate.convertAndSend(id, message);
return "点对点发送成功";
}

@GetMapping("addListener")
public String addListener(@RequestParam(value="courseId") String courseId,
@RequestParam(value="classId") String classId,
@RequestParam(value="id") String id,
@RequestParam(value="name") String name){
if(map.containsKey(id)){
return name+"已经添加过监听";
}else {
StudentListener studentListener = new StudentListener(id,name);
container.addMessageListener(studentListener,new PatternTopic(courseId+"/"+classId));
container.addMessageListener(studentListener,new PatternTopic(id));
map.put(id,studentListener);
}
return name + "监听添加成功";

}

@GetMapping("removeListener")
public String removeListener(@RequestParam(value="courseId") String courseId,
@RequestParam(value="classId") String classId,
@RequestParam(value="id") String id,
@RequestParam(value="name") String name){
if(map.containsKey(id)){
container.removeMessageListener(map.get(id));
map.remove(id);
}else {
return name + "没有进行监听,无须移除";
}
return name + "移除监听成功";
}
}

5.配置文件application.yml

server:
port: 8080
spring:
redis:
host: 127.0.0.1
database: 12
password:
port: 6379

6.启动类Client

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;


@SpringBootApplication
@ComponentScan("com.mark")
public class Client {
public static void main(String[] args) {
SpringApplication.run(Client.class, args);
}

}

7.实现的效果

redis的发布/订阅(命令、普通工程、springboot实现)_java_03

四、总结

  • 1.通过对redis的发布/订阅的多场景分析,不同代码的实现,对于如何运用有了更加明确的理解
  • 2.以后再有类似的需求和框架明确了着力点;先找最本质的逻辑,再一点点去包装
  • 3.之前就陷入到和框架死磕的结果上,其中涉及到的封装层比较多,看起来就比较乱
  • 4.后续还需要再去查阅redis发布/订阅的内部实现是如何进行的?
  • 5.后续还需要结合websocket进行针对性的研究,从netty的角度来查看长链接通信

五、升华

  • 1.通过这个例子的整理,对于道和术的层面有了更加深刻的理解,在这里道就是要先了解它本质,然后再通过术一步步包装进行实现
  • 2.举一反三这种自信心的增加,一件事真正从道的角度去理解了,所谓术的层面就相当容易了

注:引用文章
​​​https://www.redis.net.cn/tutorial/3514.html​​​


举报

相关推荐

0 条评论