0
点赞
收藏
分享

微信扫一扫

websocket统一分布式集群通信设计

寒羽鹿 2022-07-12 阅读 22


1.对面向接口设计的个人理解

1.1 接口的核心作用:

websocket统一分布式集群通信设计_spring

1.2 抽象类的核心作用:

websocket统一分布式集群通信设计_redis_02

1.3 如何进行抽象设计

  • 将要解决的是一类问题,而非一个问题
  • 对这一类问题,至少掌握两种及以上的通用处理方式;
  • 将通用的部分抽离成为框架(骨架);
  • 至少提供一个及多个不同层次的抽象类,使得流程健全可用;

2.websocket集群通信抽象设计

websocket统一分布式集群通信设计_ide_03

3.抽象代码设计与实现

3.1 代码组织结构

websocket统一分布式集群通信设计_spring_04

3.2 代码抽象设计-对内

import javax.websocket.Session;

/**
* @author automannn@163.com
* @time 2020/10/26 9:24
*/
public interface WsService {

/**
* 剔除无用连接
* @param key
* @return
*/
boolean eliminateDeadConnection(WsSessionKey key);

/**
* 接收来自连接的消息
* @param message
* @param session
*/
void receiveMessage(String message,Session session);

/**
* 发送消息
* @param message
* @param key
*/
void sendMessage(WsMessage message,WsSessionKey key);

/**
* 发送消息
* @param message
*/
void sendMessage(WsMessage message);

void sendToLocal(WsMessage message);

}

/**
* @author automannn@163.com
* @time 2020/10/26 9:18
*/
public interface WsClusterable {
/**
* 发送消息到集群
* @param msg 消息对象
* @return 是否发送成功
*/
boolean sendToCluster(String msg);

/**
* 当前环境是否是集群环境
* @return
*/
String getClusterable();
}

import com.alibaba.fastjson.JSON;
import xxx.xxx.xxx.core.websocket.outer.WsPublisher;
import org.apache.commons.lang3.StringUtils;
import org.springframework.util.Assert;

import javax.websocket.Session;

/**
* @author automannn@163.com
* @time 2020/10/27 9:12
*/
public abstract class AbstractClusterableWsService implements WsService,WsClusterable {

protected static final WsSessionCache<WsSessionKey,Session> clients = new WsSessionCache<>();

public AbstractClusterableWsService() {
}

protected abstract WsPublisher getPublisher();

protected abstract String getTopic();

@Override
public boolean eliminateDeadConnection(WsSessionKey key) {
clients.remove(key);
return true;
}

@Override
public void sendMessage(WsMessage message, WsSessionKey key) {
Assert.isTrue(message!=null&&key!=null,"参数非法");
key.setUserId(message.getReceiver());
if (clients.get(key)!=null){
sendToLocal(message,key);
}else {
if (Boolean.valueOf(getClusterable())){
boolean flag= sendToCluster(new WsMessageBuilder(message).get());
if (!flag){
//todo: 是否进行补偿?
}
}else {
}
}
}

@Override
public void sendMessage(WsMessage message) {
Assert.isTrue(message!=null,"参数不能为空");
Assert.isTrue(StringUtils.isNotEmpty(message.getReceiver()),"参数有误");
this.sendMessage(message,new WsSessionKey(message.getReceiver()));
}

@Override
public boolean sendToCluster(String msg) {
WsPublisher publisher= getPublisher();
Assert.isTrue(publisher!=null,"parameter cannot be null!");
String topic = getTopic();
Assert.isTrue(StringUtils.isNotEmpty(topic),"parameter cannot be null!");
return publisher.publish(topic,msg);
}

public void sendToLocal(WsMessage message){
sendToLocal(message,new WsSessionKey(null,message.getReceiver()));
}

private void sendToLocal(WsMessage message,WsSessionKey key){
String textMessage = JSON.toJSONString(message);
Session session = clients.get(key);
if (session!=null) session.getAsyncRemote().sendText(textMessage);
}

}

为完成抽象流程,需要提供的支撑类:

import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

import java.util.concurrent.ConcurrentHashMap;

/**
* @author automannn@163.com
* @time 2020/11/4 9:36
*/
public class WsSessionCache<K,V> extends ConcurrentHashMap<K, V> {

@Override
public V get(Object key) {
Assert.isInstanceOf(WsSessionKey.class,key,"illegal parameters");
WsSessionKey theKey = (WsSessionKey) key;
Assert.isTrue(theKey!=null,"parameter cannot be null!");
//通过遍历查找用户
if (StringUtils.isEmpty(theKey.getWsSessionId())){
if (StringUtils.isEmpty(theKey.getUserId())) return null;
for (Entry entry:entrySet()){
WsSessionKey targetKey= (WsSessionKey) entry.getKey();
if (theKey.equals(targetKey)){
return (V) entry.getValue();
}
}
return null;
}else {
return super.get(key);
}
}
}

import java.io.Serializable;
import java.util.Objects;

import org.apache.commons.lang3.StringUtils;
import org.springframework.util.Assert;

import javax.websocket.Session;

/**
* @author automannn@163.com
* @time 2020/10/27 9:48
*/
public class WsSessionKey implements Serializable {
private static final long serialVersionUID = 1;

//wsSession的id号,递增
private String wsSessionId;

//系统内用户id号
private String userId;

public WsSessionKey(String id,String userId){
this.wsSessionId = id;
this.userId = userId;
}

public WsSessionKey(Session session){
Assert.isTrue(session!=null,"parameter cannot be null!");
this.wsSessionId = session.getId();
}

public WsSessionKey(String receiver){
Assert.isTrue(StringUtils.isNotEmpty(receiver),"parameter cannot be null!");
this.userId = receiver;
}

public String getWsSessionId() {
return wsSessionId;
}

public void setWsSessionId(String wsSessionId) {
this.wsSessionId = wsSessionId;
}

public String getUserId() {
return userId;
}

public void setUserId(String userId) {
this.userId = userId;
}

@Override
public boolean equals(Object obj) {
//obj参数代表 表中的目标对象
if (!(obj instanceof WsSessionKey)) return false;
String targetId = ((WsSessionKey)obj).getWsSessionId();
String targetUserId = ((WsSessionKey)obj).getUserId();
//既包括连接号,又包括用户 id ======> 本地消息发送
if (StringUtils.isEmpty(targetId) && StringUtils.isEmpty(targetId)){
if (this.wsSessionId.equals(targetId)&&this.userId.equals(targetUserId)) return true;
}
//当目标对象中不含 用户id号时,此时连接将失效, ========>剔除无效连接
if (StringUtils.isEmpty(this.userId)){
if (targetId.equals(this.wsSessionId)) return true;
//当目标对象中 不含 连接id号时, =====>集群信息发送
}else{
if (targetUserId.equals(this.userId)) return true;
}
return false;
}

@Override
public int hashCode() {
return Objects.hashCode(wsSessionId);
}
}

import java.io.Serializable;

/**
* @author automannn@163.com
* @time 2020/10/27 9:25
*/
public class WsMessage implements Serializable {
private static final long serialVersionUID = 1;
//发送者
private String sender;
//接收者
private String receiver;
//标题
private String title;
//内容
private String content;
//业务内容
private String bizContent;

public WsMessage() {
}

public String getSender() {
return sender;
}

public void setSender(String sender) {
this.sender = sender;
}

public String getReceiver() {
return receiver;
}

public void setReceiver(String receiver) {
this.receiver = receiver;
}

public String getTitle() {
return title;
}

public void setTitle(String title) {
this.title = title;
}

public String getContent() {
return content;
}

public void setContent(String content) {
this.content = content;
}

public String getBizContent() {
return bizContent;
}

public void setBizContent(String bizContent) {
this.bizContent = bizContent;
}
}

3.3 抽象代码设计-对外

import org.springframework.beans.factory.InitializingBean;

/**
* @author automannn@163.com
* @time 2020/10/27 9:08
*/
public interface WsSubscriber extends InitializingBean {
void subscribe();

String getTopic();

Object getSubscribeHolder();
}

/**
* @author automannn@163.com
* @time 2020/10/27 9:08
*/
public interface WsPublisher {

/**
* 发布消息
* @param topic
* @param message
* @return
*/
boolean publish(String topic,String message);
}

import com.yinhai.msg.send.core.websocket.inner.WsMessage;

/**
* @author chenkh
* @time 2020/10/27 10:48
*/
public interface WsMessageListener {
boolean receiveMessage(String topic,WsMessage message);

void dispatchMessage(WsMessage wsMessage);
}

4.具体渠道设计(设计者与使用者,均可在遵循骨架的情况下进行扩展)

4.1 rabbitmq渠道实现

import xxx.xxx.xxx.core.websocket.outer.WsSubscriber;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.util.Assert;

/**
* @author automannn@163.com
* @time 2020/11/2 15:51
*/
public class WsRabbitmqSubscriber extends SimpleMessageListenerContainer implements WsSubscriber {

private MessageListener messageListener;

Object subscribeHolder;

private String topic;

public WsRabbitmqSubscriber(MessageListener messageListener, Object subscribeHolder, String topic) {
super((ConnectionFactory) subscribeHolder);
Assert.isTrue(messageListener!=null,"parameter cannot be null!");
this.messageListener = messageListener;
this.subscribeHolder = subscribeHolder;
this.topic = topic;
}

@Override
public void subscribe() {
getSubscribeHolder().setupMessageListener(this.messageListener);
}

@Override
public String getTopic() {
return this.topic;
}

@Override
public MessageListenerContainer getSubscribeHolder() {
return (MessageListenerContainer) this.subscribeHolder;
}
}

import xxx.xxx.xxx.core.websocket.outer.WsPublisher;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.core.env.Environment;
import org.springframework.util.Assert;

/**
* @author automannn@163.com
* @time 2020/11/2 15:51
*/
public class WsRabbitmqPublisher implements WsPublisher {

public static final String WS_TOPIC = "xxx.websocket.topic";
private RabbitTemplate rabbitTemplate;

private Environment environment;

public WsRabbitmqPublisher(RabbitTemplate rabbitTemplate,Environment environment) {
Assert.isTrue(rabbitTemplate!=null,"This parameter cannot be null!");
Assert.isTrue(environment!=null,"This parameter cannot be null!");
this.rabbitTemplate = rabbitTemplate;
this.environment = environment;
}

@Override
public boolean publish(String exchange, String message) {
//这里的topic,相当于交换机 的概念
String topic = environment.getProperty(WS_TOPIC);
rabbitTemplate.convertAndSend(exchange, topic+".*",message);
return true;
}
}

import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.Channel;
import xxx.xxx.xxx.core.websocket.inner.WsMessage;
import xxx.xxx.xxx.core.websocket.inner.WsService;
import xxx.xxx.xxx.core.websocket.outer.WsMessageListener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.util.Assert;

import java.nio.charset.StandardCharsets;

/**
* @author automannn@163.com
* @time 2020/11/2 15:51
*/
public class WsRabbitmqMessageListener implements WsMessageListener, ChannelAwareMessageListener {

private WsService wsService;

public WsRabbitmqMessageListener(WsService wsService) {
Assert.isTrue(wsService!=null,"parameter cannot be null!");
this.wsService = wsService;
}

@Override
public boolean receiveMessage(String topic, WsMessage message) {
//todo:do something
return true;
}

@Override
public void dispatchMessage(WsMessage wsMessage) {
wsService.sendToLocal(wsMessage);
}

@Override
public void onMessage(Message message, Channel channel) {
String topic= message.getMessageProperties().getConsumerQueue();
byte[] body = message.getBody();
String wsMessageStr = new String(body, StandardCharsets.UTF_8);
WsMessage wsMessage= JSON.parseObject(wsMessageStr,WsMessage.class);
receiveMessage(topic,wsMessage);
dispatchMessage(wsMessage);
}
}

4.2 rocketmq渠道实现

import xxx.xxx.xxx.core.websocket.outer.WsSubscriber;
import xxx.xxx.xxx.core.exception.AppException;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.springframework.util.Assert;

/**
* @author automannn@163.com
* @time 2020/11/2 15:11
*/
public class WsRocketmqSubscriber implements WsSubscriber {
public static final String SPLIT_FLAG = "~";
private MessageListenerConcurrently messageListener;

Object subscribeHolder;

private String topic;

public WsRocketmqSubscriber(MessageListenerConcurrently messageListener, Object subscribeHolder, String topic) {
Assert.isTrue(messageListener!=null,"parameter cannot be null!");
Assert.isInstanceOf(DefaultMQPushConsumer.class,subscribeHolder,"illegal parameter!");
this.messageListener = messageListener;
this.subscribeHolder = subscribeHolder;
this.topic = topic;
}

@Override
public void subscribe() {
getSubscribeHolder().registerMessageListener(this.messageListener);
getSubscribeHolder().setMessageModel(MessageModel.BROADCASTING);
try {
String topic = getTopic();
String[] topicArr= topic.split(SPLIT_FLAG);
getSubscribeHolder().subscribe(topicArr[0],topicArr[1]);
getSubscribeHolder().start();
} catch (MQClientException e) {
throw new AppException("rocketmq subscribe failed!");
}
}

@Override
public String getTopic() {
return this.topic;
}

@Override
public DefaultMQPushConsumer getSubscribeHolder() {
return (DefaultMQPushConsumer) this.subscribeHolder;
}

@Override
public void afterPropertiesSet() throws Exception {
subscribe();
}
}

import xxx.xxx.xxx.core.websocket.outer.WsPublisher;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import org.springframework.util.Assert;

/**
* @author automannn@163.com
* @time 2020/11/2 15:11
*/
public class WsRocketmqPublisher implements WsPublisher {
private DefaultMQProducer producer;

public WsRocketmqPublisher(DefaultMQProducer producer) {
Assert.isTrue(producer!=null,"This parameter cannot be null!");
this.producer = producer;
}

@Override
public boolean publish(String topic, String message) {
Message sendMsg = new Message(topic,"*",message.getBytes());
try {
SendResult sendResult = producer.send(sendMsg);
if (sendResult.getSendStatus()== SendStatus.SEND_OK){
return true;
}else {
return false;
}
} catch (Exception e) {
return false;
}
}
}

import com.alibaba.fastjson.JSON;
import xxx.xxx.xxx.core.websocket.inner.WsMessage;
import xxx.xxx.xxx.core.websocket.inner.WsService;
import xxx.xxx.xxx.core.websocket.outer.WsMessageListener;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;

import java.io.UnsupportedEncodingException;
import java.util.List;

/**
* @author automannn@163.com
* @time 2020/11/2 15:11
*/
public class WsRocketmqMessageListener implements MessageListenerConcurrently, WsMessageListener {
public static final Logger LOGGER = LoggerFactory.getLogger(WsRocketmqMessageListener.class);

private WsService wsService;

public WsRocketmqMessageListener(WsService wsService) {
Assert.isTrue(wsService!=null,"parameter cannot be null!");
this.wsService = wsService;
}

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {

if (CollectionUtils.isEmpty(list)){
LOGGER.info("mq接收消息为空,直接返回成功");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
MessageExt messageExt = list.get(0);
LOGGER.info("mq接收到的消息为:"+messageExt.toString());

try{
String topic = messageExt.getTopic();
String tags = messageExt.getTags();
String body = new String(messageExt.getBody(),"utf-8");

LOGGER.info("mq消息topic={},tags={},消息内容={}",topic,tags,body);
WsMessage wsMessage= JSON.parseObject(body,WsMessage.class);
receiveMessage(topic,wsMessage);
dispatchMessage(wsMessage);
} catch (UnsupportedEncodingException e) {
LOGGER.error("获取mq消息内容异常!");
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}

@Override
public boolean receiveMessage(String topic, WsMessage message) {
//todo:do some thing
return true;
}

@Override
public void dispatchMessage(WsMessage wsMessage) {
wsService.sendToLocal(wsMessage);
}
}

4.3 redis渠道实现

import xxx.xxx.xxx.core.websocket.outer.WsSubscriber;
import org.apache.commons.lang3.StringUtils;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.Topic;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.util.Assert;

import java.util.ArrayList;
import java.util.List;

/**
* @author automannn@163.com
* @time 2020/10/27 10:40
*/
public class WsRedisSubscriber implements WsSubscriber{

private MessageListener messageListener;

Object subscribeHolder;

private String topic;

public WsRedisSubscriber(RedisMessageListenerContainer subscribeHolder,MessageListener messageListener,String topic){
Assert.isTrue(messageListener!=null,"parameter cannot be null!");
Assert.isInstanceOf(RedisMessageListenerContainer.class,subscribeHolder,"illegal parameter!");
this.topic = topic;
this.messageListener = messageListener;
this.subscribeHolder = subscribeHolder;
}

@Override
public void subscribe() {
Assert.isTrue(StringUtils.isNotEmpty(topic),"parameter cannot be null!");
Assert.isTrue(getSubscribeHolder()!=null,"parameter cannot be null!");
List<Topic> topicList = new ArrayList<>();
topicList.add(new PatternTopic(topic));
getSubscribeHolder().addMessageListener(listenerAdapter(),topicList);
}

private MessageListenerAdapter listenerAdapter(){
return new MessageListenerAdapter(this.messageListener);
}


@Override
public String getTopic() {
return this.topic;
}

@Override
public RedisMessageListenerContainer getSubscribeHolder() {
return (RedisMessageListenerContainer) this.subscribeHolder;
}

@Override
public void afterPropertiesSet() throws Exception {
subscribe();
}
}

import com.alibaba.fastjson.JSON;
import xxx.xxx.xxx.core.websocket.inner.WsMessage;
import xxx.xxx.xxx.core.websocket.outer.WsPublisher;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.util.Assert;

/**
* @author automannn@163.com
* @time 2020/10/27 10:34
*/
public class WsRedisPublisher implements WsPublisher {

private RedisTemplate redisTemplate;

public WsRedisPublisher(RedisTemplate redisTemplate){
Assert.isTrue(redisTemplate!=null,"This parameter cannot be null!");
this.redisTemplate = redisTemplate;
}

@Override
public boolean publish(String topic, String message) {
//由于redis在存储的时候,自身做了序列化与反序列化的操作,这里选择存入对象 方便消费者使用
WsMessage wsMessage = JSON.parseObject(message,WsMessage.class);
redisTemplate.convertAndSend(topic,wsMessage);
return true;
}
}

import xxx.xxx.xxx.core.websocket.inner.WsMessage;
import xxx.xxx.xxx.core.websocket.inner.WsService;
import xxx.xxx.xxx.core.websocket.outer.WsMessageListener;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.util.Assert;

/**
* @author automannn@163.com
* @time 2020/10/27 11:00
*/
public class WsRedisMessageListener implements WsMessageListener, MessageListener {

private WsService wsService;

private RedisTemplate redisTemplate;

public WsRedisMessageListener(WsService wsService, RedisTemplate redisTemplate) {
Assert.isTrue(wsService!=null,"parameter cannot be null!");
Assert.isTrue(redisTemplate!=null,"parameter cannot be null!");
this.wsService = wsService;
this.redisTemplate = redisTemplate;
}

@Override
public boolean receiveMessage(String topic, WsMessage message) {
//todo: dosomething
return true;
}

@Override
public void dispatchMessage(WsMessage wsMessage) {
Assert.isTrue(wsMessage!=null,"parameter cannot be null!");
wsService.sendToLocal(wsMessage);
}

@Override
public void onMessage(Message message, byte[] bytes) {
String topic= new String(message.getChannel());
RedisSerializer redisSerializer= redisTemplate.getDefaultSerializer();
WsMessage wsMessage= (WsMessage) redisSerializer.deserialize(message.getBody());
receiveMessage(topic,wsMessage);
dispatchMessage(wsMessage);
}

}

5.基于springBoot的配置设计

5.1 代码层面渠道bean配置示例

import xxx.xxx.xxx.core.websocket.inner.WsService;
import xxx.xxx.xxx.core.websocket.outer.WsPublisher;
import xxx.xxx.xxx.core.websocket.outer.WsSubscriber;
import xxx.xxx.xxx.core.websocket.outer.rabbitMqImpl.WsRabbitmqMessageListener;
import xxx.xxx.xxx.core.websocket.outer.rabbitMqImpl.WsRabbitmqPublisher;
import xxx.xxx.xxx.core.websocket.outer.rabbitMqImpl.WsRabbitmqSubscriber;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.EnvironmentAware;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;

import javax.annotation.Resource;

/**
* @author automannn@163.com
* @time 2020/11/3 9:53
*/
@Configuration
@ConditionalOnClass(EnableRabbit.class)
@ConfigurationProperties(prefix = "xxx.websocket.rabbitmq")
@ConditionalOnProperty(prefix = "xxx.websocket.cluterable",value = "type",havingValue = "rabbitmq")
public class RabbitMqPubsubConfiguration implements EnvironmentAware {
public static final String WS_TOPIC = "msg.websocket.topic";
@Autowired
private WsService wsService;

@Resource
private RabbitTemplate rabbitTemplate;

@Autowired
private CachingConnectionFactory connectionFactory;

private Environment environment;
/*队列名称*/
private String queueName;
/*交换机名称,需要在rabbitmq新建*/
private String exchangeName;
/*每个消费者获取的最大消息数量*/
private int prefetchCount;
/*消费者个数*/
private int concurrentConsumers = 1;

/**
* 队列配置
*/
@Bean
public Queue theQueue(){
return new Queue(this.queueName);
}

/**
* 交换机配置
*/
@Bean
public TopicExchange topicExchange(){
return new TopicExchange(this.exchangeName);
}

/**
* 绑定 严格路由键
*/
@Bean
public Binding bindingTopicExchangeMessage(){
String topic= this.environment.getProperty(WS_TOPIC);
return BindingBuilder.bind(theQueue()).to(topicExchange()).with(topic+".#");
}

@Bean
public WsPublisher wsPublisher(){
return new WsRabbitmqPublisher(rabbitTemplate,this.environment);
}

@Bean
public MessageListener messageListener(){
return new WsRabbitmqMessageListener(wsService);
}

@Bean
public WsSubscriber wsSubscriber(){
WsRabbitmqSubscriber container = new WsRabbitmqSubscriber(messageListener(),connectionFactory,this.queueName);
container.setQueueNames(this.queueName);
container.setExposeListenerChannel(true);
container.setPrefetchCount(this.prefetchCount);//设置每个消费者获取的最大的消息数量
container.setConcurrentConsumers(concurrentConsumers);//消费者个数
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
container.setMessageListener(messageListener());//监听处理类
return container;
}

public String getQueueName() {
return queueName;
}

public void setQueueName(String queueName) {
this.queueName = queueName;
}

public String getExchangeName() {
return exchangeName;
}

public void setExchangeName(String exchangeName) {
this.exchangeName = exchangeName;
}

public int getPrefetchCount() {
return prefetchCount;
}

public void setPrefetchCount(int prefetchCount) {
this.prefetchCount = prefetchCount;
}

public int getConcurrentConsumers() {
return concurrentConsumers;
}

public void setConcurrentConsumers(int concurrentConsumers) {
this.concurrentConsumers = concurrentConsumers;
}

@Override
public void setEnvironment(Environment environment) {
this.environment = environment;
}
}

5.2 配置文件配置示例

xxx:
websocket:
topic: wsExchange
isCluster: false
cluterable:
type: redis #redis,rabbitmq,rocketmq
rocketmq:
#生产者配置
groupName: ${spring.application.name}
namesrvAddr: 192.168.10.21:9876
maxMessageSize: 4096
sendMsgTimeOut: 3000
#消费者配置
retryTimesWhenSendFailed: 2
topic: ${msg.websocket.topic}~*
consumeThreadMin: 5
consumeThreadMax: 32
consumeMessageBatchMaxSize: 1
rabbitmq:
#rabbitmq的连接配置信息,与spring保持一致
queueName: ${msg.websocket.topic}.${spring.application.name}
exchangeName: ${msg.websocket.topic}
prefetchCount: 1
concurrentConsumers: 1

6.备注

   代码中的包以及配置之类,均做了敏感信息的处理,小朋友请在大朋友的陪同下观看。要讲武德,不要搞骗、搞偷袭.....

举报

相关推荐

0 条评论