目录
- 一、背景介绍
- 二、思路&方案
- 三、过程
- redis发布/订阅官方讲解
- 命令行实现
- 普通工程实现
- springboot工程集成实现
- springboot工程集成改造实现动态点对点、广播
- 四、总结
- 五、升华
一、背景介绍
公司最近有业务需求如下:
- 1.小美老师给五年级三班上数学课的时候,实现给所在班级进行实时推送数学课程的活动(广播通信)
- 2.小明在上课的时候给小红的评论进行了点赞,此时小红会收到小明给你点赞这样的通知(点对点通信)
二、思路&方案
- 基于背景中的需求,我想到了redis的发布/订阅模式;后端向前端发消息使用websocket建立长链接就好
- 这里只介绍redis的发布/订阅模式实现
- 通过命令、普通工程集成、springboot实现三种方式进行实现理解
三、过程
redis发布/订阅官方讲解
Redis 发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息。
Redis 客户端可以订阅任意数量的频道。
下图展示了频道 channel1 , 以及订阅这个频道的三个客户端 —— client2 、 client5 和 client1 之间的关系:
当有新消息通过 PUBLISH 命令发送给频道 channel1 时, 这个消息就会被发送给订阅它的三个客户端:
命令行实现
1.以下实例演示了发布订阅是如何工作的。在我们实例中我们创建了订阅频道名为 redisChat:
redis 127.0.0.1:6379> SUBSCRIBE redisChat
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "redisChat"
3) (integer) 1
2.现在,我们先重新开启个 redis 客户端,然后在同一个频道 redisChat 发布两次消息,订阅者就能接收到消息。
redis 127.0.0.1:6379> PUBLISH redisChat "Redis is a great caching technique"
(integer) 1
redis 127.0.0.1:6379> PUBLISH redisChat "Learn redis by w3cschool.cc"
(integer) 1
# 订阅者的客户端会显示如下消息
1) "message"
2) "redisChat"
3) "Redis is a great caching technique"
1) "message"
2) "redisChat"
3) "Learn redis by w3cschool.cc"
普通工程实现
1.pom文件引入redis包
<!-- redis的发布订阅-->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
2.发布者类Publisher
package com.b0022redis发布订阅;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
/**
* 发布者类
*/
public class Publisher extends Thread{
private final JedisPool jedisPool;
public Publisher(JedisPool jedisPool) {
this.jedisPool = jedisPool;
}
@Override
public void run() {
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
Jedis jedis = jedisPool.getResource(); //连接池中取出一个连接
while (true) {
String line = null;
try {
line = reader.readLine();
if (!"quit".equals(line)) {
jedis.publish("mychannel", line); //从 mychannel 的频道上推送消息
} else {
break;
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
3.订阅者类SubThread
package com.b0022redis发布订阅;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
/**
* 订阅者类
*/
public class SubThread extends Thread {
private final JedisPool jedisPool;
private final Subscriber subscriber = new Subscriber();
private final String channel = "mychannel";
public SubThread(JedisPool jedisPool) {
super("SubThread");
this.jedisPool = jedisPool;
}
@Override
public void run() {
System.out.println(String.format("subscribe redis, channel %s, thread will be blocked", channel));
Jedis jedis = null;
try {
jedis = jedisPool.getResource(); //取出一个连接
jedis.subscribe(subscriber, channel); //通过subscribe 的api去订阅,入参是订阅者和频道名
} catch (Exception e) {
System.out.println(String.format("subsrcibe channel error, %s", e));
} finally {
if (jedis != null) {
jedis.close();
}
}
}
}
4.消息监听回调类Subscriber
package com.b0022redis发布订阅;
import redis.clients.jedis.JedisPubSub;
/**
* redis消息监听回调类
*/
public class Subscriber extends JedisPubSub {
public Subscriber(){}
@Override
public void onMessage(String channel, String message) { //收到消息会调用
System.out.println(String.format("receive redis published message, channel %s, message %s", channel, message));
}
@Override
public void onSubscribe(String channel, int subscribedChannels) { //订阅了频道会调用
System.out.println(String.format("subscribe redis channel success, channel %s, subscribedChannels %d",
channel, subscribedChannels));
}
@Override
public void onUnsubscribe(String channel, int subscribedChannels) { //取消订阅 会调用
System.out.println(String.format("unsubscribe redis channel, channel %s, subscribedChannels %d",
channel, subscribedChannels));
}
}
5.客户端类Client
package com.b0022redis发布订阅;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
/**
* redis 发布订阅java版本,目前是一个订阅,一个发布
*
*/
public class Client {
public static void main( String[] args )
{
// 连接redis服务端
JedisPool jedisPool = new JedisPool(new JedisPoolConfig(), "127.0.0.1", 6379);
System.out.println(String.format("redis pool is starting, redis ip %s, redis port %d", "127.0.0.1", 6379));
SubThread subThread = new SubThread(jedisPool); //订阅者
subThread.start();
Publisher publisher = new Publisher(jedisPool); //发布者
publisher.start();
}
}
springboot工程集成实现
1.pom文件引入的包
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<!-- <scope>compile</scope>-->
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
</dependencies>
2.监听实现类CatListener
package com.mark;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.stereotype.Component;
/**
* 监听发送的消息
*/
@Component
public class CatListener extends MessageListenerAdapter implements MessageListener {
@Autowired
RedisTemplate redisTemplate;
@Override
public void onMessage(Message message, byte[] bytes) {
System.out.println("我是Cat监听" + message.toString());
}
}
3.redis消息配置,添加监听类RedisMessageConfig
package com.mark;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
@Configuration
public class RedisMessageConfig {
/**
* redis消息监听器容器
* 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器
* 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理
* @param connectionFactory
* @param
* @return
*/
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
CatListener catAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
//订阅了一个叫chat 的通道
container.addMessageListener(catAdapter, new PatternTopic("cat"));
//这个container 可以添加多个 messageListener
return container;
}
/**
* redis 读取内容的template
*/
@Bean
StringRedisTemplate stringRedisTemplate(RedisConnectionFactory connectionFactory) {
return new StringRedisTemplate(connectionFactory);
}
@Bean
public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory factory) {
StringRedisTemplate template = new StringRedisTemplate(factory);
//定义value的序列化方式
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
template.setValueSerializer(jackson2JsonRedisSerializer);
template.setHashKeySerializer(jackson2JsonRedisSerializer);
template.afterPropertiesSet();
return template;
}
}
4.发送者类TestController
package com.mark;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RestController
public class TestController {
@Resource
StringRedisTemplate stringRedisTemplate;
@Resource
RedisMessageListenerContainer container;
@GetMapping("cat")
public void sendCatMessage() {
stringRedisTemplate.convertAndSend("cat", "猫");
}
}
5.配置文件application.yml
server:
port: 8080
spring:
redis:
host: 127.0.0.1
database: 12
password:
port: 6379
6.启动类Client
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
@SpringBootApplication
@ComponentScan("com.mark")
public class Client {
public static void main(String[] args) {
SpringApplication.run(Client.class, args);
}
}
springboot工程集成改造实现动态点对点、广播
1.pom文件引入的包
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
</dependencies>
2.监听实现类StudentListener
package com.mark;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.stereotype.Component;
/**
* 监听发送的消息
*/
@Component
public class StudentListener extends MessageListenerAdapter implements MessageListener {
private String id;
private String name;
StudentListener(String id,String name){
this.id = id;
this.name = name;
}
StudentListener(){
}
@Override
public void onMessage(Message message, byte[] bytes) {
System.out.println("我是监听者"+this.name+",我的id是:"+this.id+";我收到的消息是:" + message.toString());
}
}
3.redis消息配置,添加监听类RedisMessageConfig
package com.mark;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
@Configuration
public class RedisMessageConfig {
/**
* redis消息监听器容器
* 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器
* 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理
* @param connectionFactory
* @param
* @return
*/
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
//这个container 可以添加多个 messageListener
return container;
}
/**
* redis 读取内容的template
*/
@Bean
StringRedisTemplate stringRedisTemplate(RedisConnectionFactory connectionFactory) {
return new StringRedisTemplate(connectionFactory);
}
@Bean
public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory factory) {
StringRedisTemplate template = new StringRedisTemplate(factory);
//定义value的序列化方式
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
template.setValueSerializer(jackson2JsonRedisSerializer);
template.setHashKeySerializer(jackson2JsonRedisSerializer);
template.afterPropertiesSet();
return template;
}
}
4.添加监听、移除监听、发送消息类TestController
package com.mark;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.concurrent.ConcurrentHashMap;
@RestController
public class TestController {
ConcurrentHashMap<String,StudentListener> map = new ConcurrentHashMap<>();
@Resource
StringRedisTemplate stringRedisTemplate;
@Resource
RedisMessageListenerContainer container;
@GetMapping("pushMany")
public String pushMany(@RequestParam(value="courseId") String courseId,
@RequestParam(value="classId") String classId,
@RequestParam(value="message") String message) {
stringRedisTemplate.convertAndSend(courseId+"/"+classId, message);
return "广播发送成功";
}
@GetMapping("pushOne")
public String pushOne(@RequestParam(value="id") String id,
@RequestParam(value="message") String message) {
stringRedisTemplate.convertAndSend(id, message);
return "点对点发送成功";
}
@GetMapping("addListener")
public String addListener(@RequestParam(value="courseId") String courseId,
@RequestParam(value="classId") String classId,
@RequestParam(value="id") String id,
@RequestParam(value="name") String name){
if(map.containsKey(id)){
return name+"已经添加过监听";
}else {
StudentListener studentListener = new StudentListener(id,name);
container.addMessageListener(studentListener,new PatternTopic(courseId+"/"+classId));
container.addMessageListener(studentListener,new PatternTopic(id));
map.put(id,studentListener);
}
return name + "监听添加成功";
}
@GetMapping("removeListener")
public String removeListener(@RequestParam(value="courseId") String courseId,
@RequestParam(value="classId") String classId,
@RequestParam(value="id") String id,
@RequestParam(value="name") String name){
if(map.containsKey(id)){
container.removeMessageListener(map.get(id));
map.remove(id);
}else {
return name + "没有进行监听,无须移除";
}
return name + "移除监听成功";
}
}
5.配置文件application.yml
server:
port: 8080
spring:
redis:
host: 127.0.0.1
database: 12
password:
port: 6379
6.启动类Client
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
@SpringBootApplication
@ComponentScan("com.mark")
public class Client {
public static void main(String[] args) {
SpringApplication.run(Client.class, args);
}
}
7.实现的效果
四、总结
- 1.通过对redis的发布/订阅的多场景分析,不同代码的实现,对于如何运用有了更加明确的理解
- 2.以后再有类似的需求和框架明确了着力点;先找最本质的逻辑,再一点点去包装
- 3.之前就陷入到和框架死磕的结果上,其中涉及到的封装层比较多,看起来就比较乱
- 4.后续还需要再去查阅redis发布/订阅的内部实现是如何进行的?
- 5.后续还需要结合websocket进行针对性的研究,从netty的角度来查看长链接通信
五、升华
- 1.通过这个例子的整理,对于道和术的层面有了更加深刻的理解,在这里道就是要先了解它本质,然后再通过术一步步包装进行实现
- 2.举一反三这种自信心的增加,一件事真正从道的角度去理解了,所谓术的层面就相当容易了
注:引用文章
https://www.redis.net.cn/tutorial/3514.html