0
点赞
收藏
分享

微信扫一扫

精讲23种设计模式-基于观察者模式~设计异步多渠道群发框架


文章目录

  • ​​一、观察者模式​​
  • ​​1. 观察者模式基本概念​​
  • ​​2. 观察者模式的应用场景​​
  • ​​3. 观察者模式的类图​​
  • ​​二、设计异步多渠道群发框架​​
  • ​​2.1. 定义消息观察者抽象接口​​
  • ​​2.2. 创建观察者​​
  • ​​2.3. 主题通知所有观察者​​
  • ​​2.4. 观察者注册​​
  • ​​2.5. 自定义线程池​​
  • ​​2.6. 签单通知入口​​
  • ​​2.6. 异步通知接口测试​​
  • ​​2.7. 依赖​​
  • ​​三、Spring事件通知​​
  • ​​3.1. 定义消息实体类​​
  • ​​3.2. 定义(邮件)事件通知​​
  • ​​3.3. 定义(短信)事件通知​​
  • ​​3.4. 签单同步通知入口​​
  • ​​3.5. 测试效果​​
  • ​​3.6. 开源项目​​
一、观察者模式
1. 观察者模式基本概念

一个对象状态改变,通知给其他所有的对象

2. 观察者模式的应用场景

Zk的事件监听、分布式配置中心刷新配置文件、业务中群发不同渠道消息

3. 观察者模式的类图

精讲23种设计模式-基于观察者模式~设计异步多渠道群发框架_观察者模式

二、设计异步多渠道群发框架
2.1. 定义消息观察者抽象接口

package com.gblfy.observer;

import com.alibaba.fastjson.JSONObject;

/**
* 定义消息观察者(ObServer)抽象接口
*
* @author gblfy
* @date 2022-03-15
*/
public interface GblfyObServer {

void sendMsg(JSONObject jsonObject);
}

2.2. 创建观察者

短信观察者

package com.gblfy.observer.impl;

import com.alibaba.fastjson.JSONObject;
import com.gblfy.observer.GblfyObServer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

/**
* 定义(短信)事件通知
*
* @author gblfy
* @date 2022-03-15
*/
@Slf4j
@Component
public class SmsObServer implements GblfyObServer {
@Override
@Async("customAsyncThreadPool")
public void sendMsg(JSONObject jsonObject) {
log.info("观察者模式发送->短信-->{}", jsonObject.toJSONString());

}
}

邮件观察者

package com.gblfy.observer.impl;

import com.alibaba.fastjson.JSONObject;
import com.gblfy.observer.GblfyObServer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

/**
* 定义(邮件)事件通知
*
* @author gblfy
* @date 2022-03-15
*/
@Slf4j
@Component
public class EmailObServer implements GblfyObServer {

@Override
@Async("customAsyncThreadPool")
public void sendMsg(JSONObject jsonObject) {
log.info("观察者模式发送->邮件-->{}",jsonObject.toJSONString());
}
}

2.3. 主题通知所有观察者

package com.gblfy.observer;

import com.alibaba.fastjson.JSONObject;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.List;

/**
* 定义消注册中心,主题通知所有观察者
*
* @author gblfy
* @date 2022-03-15
*/
@Component
public class GblfySubject {

//观察者容器
private List<GblfyObServer> obServerList = new ArrayList<>();

/**
* 添加观察者
*
* @param gblfyObServer
*/
public void addObServer(GblfyObServer gblfyObServer) {
obServerList.add(gblfyObServer);
}

/**
* 通知所有的观察者
*
* @param jsonObject
*/
public void notifyObServer(JSONObject jsonObject) {
obServerList.stream().forEach(p -> p.sendMsg(jsonObject));
}
}

2.4. 观察者注册

项目启动自动注册观察者

package com.gblfy.start;

import com.gblfy.observer.GblfyObServer;
import com.gblfy.observer.GblfySubject;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;

import java.util.Map;

/**
* 项目启动注册观察者
*
* @author gblfy
* @date 2022-03-15
*/
@Component
public class StartService implements ApplicationRunner, ApplicationContextAware {

@Autowired
private GblfySubject gblfySubject;

//初始化上下文对象
private ApplicationContext applicationContext;

/**
* 项目启动成功注册在观察着集合(ObServer的子类实例)
*
* @param args
* @throws Exception
*/
@Override
public void run(ApplicationArguments args) throws Exception {
/**
* 根据接口类获取相应bena对象,自动注册观察者
* 1.使用spring获取ObServer下所有子类的bean对象
* 2.将bean对象依次添加到gblfySubject观察者集合中即可
*/
Map<String, GblfyObServer> map = applicationContext.getBeansOfType(GblfyObServer.class);
for (String key : map.keySet()) {
GblfyObServer gblfyObServer = map.get(key);
gblfySubject.addObServer(gblfyObServer);
}
}

//获取上下文
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
}

2.5. 自定义线程池

package com.gblfy.config;

import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

/**
* @Deacription 自定义异步线程池
* @Author gblfy
* @Date 2022-03-15 10:38
**/
@Component
@EnableAsync
public class AsyncScheduledTaskConfig {

@Bean("customAsyncThreadPool")
public Executor customAsyncThreadPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//最大线程数
executor.setMaxPoolSize(100);
//核心线程数
executor.setCorePoolSize(10);
//任务队列的大小
executor.setQueueCapacity(10);
//线程池名的前缀
executor.setThreadNamePrefix("gblfy-signpolicy-asynnotify-");
//允许线程的空闲时间30秒
executor.setKeepAliveSeconds(30);
//设置线程池关闭的时候等待所有任务都完成再继续销毁其他的Bean
executor.setWaitForTasksToCompleteOnShutdown(true);
//设置线程池中任务的等待时间,如果超过这个时候还没有销毁就强制销毁,以确保应用最后能够被关闭,而不是阻塞住
executor.setAwaitTerminationSeconds(60);

/**
* 拒绝处理策略
* CallerRunsPolicy():交由调用方线程运行,比如 main 线程。
* AbortPolicy():直接抛出异常。
* DiscardPolicy():直接丢弃。
* DiscardOldestPolicy():丢弃队列中最老的任务。
*/
/**
* 特殊说明:
* 1. 这里演示环境,拒绝策略咱们采用抛出异常
* 2.真实业务场景会把缓存队列的大小会设置大一些,
* 如果,提交的任务数量超过最大线程数量或将任务环缓存到本地、redis、mysql中,保证消息不丢失
* 3.如果项目比较大的话,异步通知种类很多的话,建议采用MQ做异步通知方案
*/
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
//线程初始化
executor.initialize();
return executor;
}
}

2.6. 签单通知入口

package com.gblfy.controller;

import com.alibaba.fastjson.JSONObject;
import com.gblfy.observer.GblfySubject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

/**
* 签单通知入口
*
* @author gblfy
* @date 2022-03-15
*/
@Slf4j
@RestController
public class SignPolicyController {

@Autowired
private GblfySubject gblfySubject;

/**
* 签单异步通知(短信、邮件等多种方式)
*
* @return
*/
@GetMapping("/signPolicyToAsynNotify")
public String signPolicy() {
log.info("将签单信息保存数据库处理");

JSONObject jsonObject = new JSONObject();
jsonObject.put("sms", "1766666666");
jsonObject.put("email", "1766@163.com");

gblfySubject.notifyObServer(jsonObject);
return "success";
}
}

2.6. 异步通知接口测试

http://localhost:8080/signPolicyToAsynNotify

精讲23种设计模式-基于观察者模式~设计异步多渠道群发框架_观察者模式_02

2.7. 依赖

<!--字符串工具类-->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.12.0</version>
</dependency>
<!--数据json处理-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.79</version>
</dependency>
<!--SpringMVC-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

三、Spring事件通知
3.1. 定义消息实体类

package com.gblfy.entity;

import org.springframework.context.ApplicationEvent;

/**
* 定义消息实体类
*
* @author gblfy
* @date 2022-03-15
*/
public class SignPolicyMsgEntity extends ApplicationEvent {
private String email;
private String phone;
private String userId;

public SignPolicyMsgEntity(Object source) {
super(source);
}

public SignPolicyMsgEntity(Object source, String email, String phone) {
super(source);
this.email = email;
this.phone = phone;
}

@Override
public String toString() {
return "email:" + email + ",phone:" + phone;
}
}

3.2. 定义(邮件)事件通知

package com.gblfy.listener;

import com.gblfy.entity.SignPolicyMsgEntity;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;

/**
* 定义(邮件)事件通知
*
* @author gblfy
* @date 2022-03-15
*/
@Component
@Slf4j
public class EmailListener implements ApplicationListener<SignPolicyMsgEntity> {

@Override
public void onApplicationEvent(SignPolicyMsgEntity event) {
log.info("eamil:->{}", event.toString());
}
}

3.3. 定义(短信)事件通知

package com.gblfy.listener;

import com.gblfy.entity.SignPolicyMsgEntity;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;

/**
* 定义(短信)事件通知
*
* @author gblfy
* @date 2022-03-15
*/
@Component
@Slf4j
public class SmsListener implements ApplicationListener<SignPolicyMsgEntity> {

@Override
public void onApplicationEvent(SignPolicyMsgEntity event) {
log.info("sms:->{}", event.toString());

}

}

3.4. 签单同步通知入口

@Autowired
private ApplicationEventPublisher applicationEventPublisher;

/**
* 签单同步通知(短信、邮件等多种方式)
* 使用spring事件通知
*
* @return
*/
@GetMapping("/signPolicyToSyncNotify")
public String signPolicyToSyncNotify() {
log.info("将签单信息保存数据库处理");

SignPolicyMsgEntity signPolicyMsgEntity = new SignPolicyMsgEntity(this, "1766@163.com", "1766666666");
applicationEventPublisher.publishEvent(signPolicyMsgEntity);
return "success";
}

3.5. 测试效果

http://localhost:8080/signPolicyToSyncNotify

精讲23种设计模式-基于观察者模式~设计异步多渠道群发框架_观察者模式_03

3.6. 开源项目

​​https://gitee.com/gblfy/design-pattern/tree/observer-mode/​​


举报

相关推荐

0 条评论