0
点赞
收藏
分享

微信扫一扫

分布式websocket实时通讯的session共享问题

捡历史的小木板 2024-05-16 阅读 17

目录

1.需求

2.前置条件和要求

3.方案分析

3.1.方案1:session共享存储到redis数据库

3.2.方案2:采用mongo生命周期的AbstractMongoEventListener

3.3.方案3:引入redis等MQ组件,发送广播消息

3.4.方案4:采用change stream方式同步数据

4.基于change stream方式实现分布式下websocket的消息推送

4.1.pom.xml引入mongodb启动器

4.2.配置类增加自定义bean

4.3.实现监听中的回调函数

5.mongodb的单机模式下也可采用AbstractMongoEventListener方式

6.文档推荐


1.需求

就一句话:在多个springboot进程和多节点Mongo副本集集群的基础上,解决websocket的session共享问题。

2.前置条件和要求

3.方案分析

3.1.方案1:session共享存储到redis数据库

方案不可行

原因:

3.2.方案2:采用mongo生命周期的AbstractMongoEventListener

方案不可行

原因:

3.3.方案3:引入redis等MQ组件,发送广播消息

原理:

方案可行

缺点:

3.4.方案4:采用change stream方式同步数据

MongoDB 从 3.6 版本开始提供订阅数据变更的功能,但仅限于mongo集群,包括mongo副本集模式和mongo分片模式

原理:

方案可行

4.基于change stream方式实现分布式下websocket的消息推送

4.1.pom.xml引入mongodb启动器

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-data-mongodb</artifactId>
</dependency>

4.2.配置类增加自定义bean

import com.mongodb.client.model.changestream.FullDocument;
import com.xxx.listener.AlarmMessageListener;
import com.xxx.util.AESUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.mongodb.MongoDatabaseFactory;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.SimpleMongoClientDatabaseFactory;
import org.springframework.data.mongodb.core.aggregation.Aggregation;
import org.springframework.data.mongodb.core.mapping.Document;
import org.springframework.data.mongodb.core.messaging.ChangeStreamRequest;
import org.springframework.data.mongodb.core.messaging.DefaultMessageListenerContainer;
import org.springframework.data.mongodb.core.messaging.MessageListener;
import org.springframework.data.mongodb.core.messaging.MessageListenerContainer;
import org.springframework.data.mongodb.core.query.Criteria;

import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
@Configuration
public class MongoDBConfig {

    @Bean
    public MongoTemplate mongoTemplate() throws Exception {
        return new MongoTemplate(mongoDatabaseFactory());
    }

    @Autowired
    public ApplicationEventPublisher eventPublisher;

	@Bean
	@ConditionalOnProperty(name = "db.is-stand-alone", havingValue = "false")
	public MessageListenerContainer alarmMessageListenerContainer(MongoTemplate mongoTemplate) {
		AlarmMessageListener messageListener = new AlarmMessageListener(mongoTemplate, eventPublisher);
		return customMessageListenerContainer("alarm", mongoTemplate, messageListener);
	}

	private MessageListenerContainer customMessageListenerContainer(String collectionName,MongoTemplate template, MessageListener messageListener) {

		MessageListenerContainer messageListenerContainer = new DefaultMessageListenerContainer(template, Executors.newFixedThreadPool(5)) {
			@Override
			public boolean isAutoStartup() {
				return true;
			}
		};
		ChangeStreamRequest<Document> request = ChangeStreamRequest.builder(messageListener)
				.collection(collectionName) // 需要监听的集合名
				// 过滤需要监听的操作类型,可以根据需求指定过滤条件
				.filter(Aggregation.newAggregation(Aggregation.match(
						Criteria.where("operationType").in("insert", "update"))))
				// 不设置时,文档更新时,只会发送变更字段的信息,设置UPDATE_LOOKUP会返回文档的全部信息
				.fullDocumentLookup(FullDocument.UPDATE_LOOKUP)
				.build();
		messageListenerContainer.register(request, Document.class);
		return messageListenerContainer;
	}
}

4.3.实现监听中的回调函数

import com.mongodb.client.model.changestream.ChangeStreamDocument;
import com.xxx.alarm.model.Alarm;
import com.xxx.notification.event.AlarmEventPublisher;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.bson.Document;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.messaging.Message;
import org.springframework.data.mongodb.core.messaging.MessageListener;

@Slf4j
public class AlarmMessageListener implements MessageListener<ChangeStreamDocument<Document>, Object> {

    private MongoTemplate mongoTemplate;
    private ApplicationEventPublisher eventPublisher;

    public AlarmMessageListener(MongoTemplate mongoTemplate, ApplicationEventPublisher eventPublisher) {
        this.mongoTemplate = mongoTemplate;
        this.eventPublisher = eventPublisher;
    }

    @Override
    public void onMessage(Message<ChangeStreamDocument<Document>, Object> message) {
        ChangeStreamDocument<Document> changeStreamDocument = message.getRaw();
        log.info("changestream操作为 :" + changeStreamDocument);
        Document document = changeStreamDocument.getFullDocument();
        Alarm alarm = mongoTemplate.getConverter().read(Alarm.class, document);
        if(alarm == null) return;
        //实时消息推送
        AlarmEventPublisher.publishDeviceAlarmEvent(alarm, eventPublisher);
    }
}

5.mongodb的单机模式下也可采用AbstractMongoEventListener方式

增加自定义listener监听器即可

import com.xxx.alarm.model.Alarm;
import com.xxx.notification.event.AlarmEventPublisher;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.data.mongodb.core.mapping.event.AbstractMongoEventListener;
import org.springframework.data.mongodb.core.mapping.event.BeforeConvertEvent;

/**
 * 告警监听
 */
@Slf4j
@Component
public class AlarmMongoDataEventListener extends AbstractMongoEventListener<Alarm> {

    private ApplicationEventPublisher eventPublisher;

    public AlarmMongoDataEventListener(ApplicationEventPublisher eventPublisher) {
        this.eventPublisher = eventPublisher;
    }

   
    @Override
    public void onBeforeConvert(BeforeConvertEvent<Alarm> alarmEvent) {
        Alarm alarm =alarmEvent.getSource();
        if(alarm == null) return;
        String notificationType = alarm.getNotificationType();
        if(StringUtils.isBlank(notificationType)) return;
        log.info("Before Convert alarm Event: " + alarm);
        //实时消息推送
        AlarmEventPublisher.publishDeviceAlarmEvent(alarm, eventPublisher);
    }
}

6.文档推荐

mongodb生命周期

mongodb集群的change streams

举报

相关推荐

0 条评论