文章目录
处理多端数据同步
由于im服务支持用户在多个客户端登录,那么就会存在一个用户在多个端的数据同步问题。
- 轮询拉取:两台设备每隔一段时间就去拉取,这样做性能有很大的浪费
- 业务回调:在业务系统执行完业务操作时,调用tcp服务的接口,来通知用户的所有客户端去更新数据,但这样做增强了2个服务之间的依赖程度
- TCP通知:到收到好友请求后,并且处理成功后,就主动的去发送特定的指令给这个用户的其他端,并且将新添加的好友信息也附带过去,这时候收到这条特色树消息的用户,不需要请求服务器,只是根据这条消息进行更本地好友列表即可,这样既解决了空轮训,也解决了和业务系统强依赖的问题
准备工作
RedisConfig
注意下hashValue的redis序列化器
@Configuration
public class RedisConfig {
@Autowired
RedisConnectionFactory redisConnectionFactory;
@Bean
public RedisTemplate<Object, Object> redisTemplate() {
RedisTemplate<Object, Object> template = new RedisTemplate<>();
template.setConnectionFactory(redisConnectionFactory);
//使用Jackson2JsonRedisSerializer来序列化和反序列化redis的value值
Jackson2JsonRedisSerializer serializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper mapper = new ObjectMapper();
mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
serializer.setObjectMapper(mapper);
template.setValueSerializer(new StringRedisSerializer());
//使用StringRedisSerializer来序列化和反序列化redis的key值
template.setKeySerializer(new StringRedisSerializer());
template.afterPropertiesSet();
template.setHashKeySerializer(new StringRedisSerializer());
template.setHashValueSerializer(serializer);
return template;
}
}
UserSessionUtils
在tcp服务的NettyServerHandler中登录成功时,会使用redisson向redis中写入会话对象,这里根据appId、userId将用户取出(注意反序列化)
@Component
public class UserSessionUtils {
@Autowired
StringRedisTemplate stringRedisTemplate;
// session的维护: {appId}:userSession:{userId} -> {clientType}:{imei} - {{userSession}}
// {clientType}:{imei} - {{userSession}}
// 获取用户所有的在线session
public List<UserSession> getUserSession(Integer appId, String userId) {
String userSessionKey = appId + Constants.RedisConstants.UserSessionConstants + userId;
Map<Object, Object> entries = stringRedisTemplate.opsForHash().entries(userSessionKey);
List<UserSession> list = new ArrayList<>();
Collection<Object> values = entries.values();
for (Object o : values) {
String sessionStr = (String) o;
UserSession session = JSONObject.parseObject(sessionStr, UserSession.class);
// 仅筛选在线状态的session
if (ImConnectStatusEnum.ONLINE_STATUS.getCode().equals(session.getConnectState())) {
list.add(session);
}
}
return list;
}
// 获取指定用户在指定端的session
public UserSession getUserSession(Integer appId, String userId, Integer clientType, String imei) {
String userSessionKey = appId + Constants.RedisConstants.UserSessionConstants + userId;
String hashKey = clientType + ":" + imei;
Object o = stringRedisTemplate.opsForHash().get(userSessionKey, hashKey);
UserSession session = JSONObject.parseObject(o.toString(), UserSession.class);
return session;
}
}
逻辑层MessageProducer
逻辑层不能直接发消息给客户端,逻辑层需要通过通知tcp服务来发送消息给相应的客户端。逻辑层将消息先发送到rabbitmq,tcp层则在rabbitmq中订阅主题,绑定监听。
发送的时候,注意1个用户可能登录了多个端,那么消息就需要同步给除了当前客户端的其它端。
@Service
public class MessageProducer {
private static Logger logger = LoggerFactory.getLogger(MessageProducer.class);
@Autowired
RabbitTemplate rabbitTemplate;
@Autowired
UserSessionUtils userSessionUtils;
private boolean sendMessage(UserSession session, Object msg) {
try {
logger.info("send message == " + msg);
// (具体消费这条消息的逻辑可以查看 MessageReceiver 这个类
// 1. 每个tcp服务在启动的时候, 都会声明自己的1个队列: messageService2Pipeline{brokerId},
// 2. 并创建1个绑定关系是: 队列 -> messageService2Pipeline{brokerId}
// 交换机 -> messageService2Pipeline
// 路由key -> {brokerId}
// 所以每个tcp服务都会有自己的1个队列, 来绑定 messageService2Pipeline 这个交换机。
// 当携带不同的路由key发送到这个交换机时, 就会路由到不同的tcp服务处理,
// 而用户在登录的时候, 创建的session对象保存了它所登录的服务的brokerId,
// 这样, 就可以精确的让存有该session对象对应的channel的tcp服务消费该条消息, 并发送到对应的客户端了
// )
rabbitTemplate.convertAndSend(
// 将消息发送到名为 messageService2Pipeline 的交换机
Constants.RabbitConstants.MessageService2Im,
// 携带的路由key: brokerId
session.getBrokerId() + "",
// MessagePack对象 转为的 json字符串
msg
);
// todo rabbitmq发布消息及rabbitmq的配置
return true;
} catch (Exception e) {
logger.error("send error :" + e.getMessage());
return false;
}
}
// sendPack方法 调用上面的 sendMessage 方法
// MessagePack 是消息服务发送给tcp服务的包体, tcp服务 再根据改包体解析成 Message 发给客户端
public boolean sendPack(String toId, Command command, Object msg, UserSession session) {
// 将 msg 转为 JSONObject对象
// todo fastjson转换java对象的方式
JSONObject jsonObject = JSONObject.parseObject(JSONObject.toJSONString(msg));
MessagePack messagePack = new MessagePack();
messagePack.setAppId(session.getAppId());
// session里面不也是有userId的么?这里干嘛还使用传入的toId
messagePack.setToId(toId);
messagePack.setClientType(session.getClientType());
messagePack.setImei(session.getImei());
// 指定操作指令
messagePack.setCommand(command.getCommand());
// 使用 MessagePack 将 msg转为JSONObject的对象 设置到data属性中
messagePack.setData(jsonObject);
// 将 MessagePack对象 转为 json字符串
String body = JSONObject.toJSONString(messagePack);
// 最终调用 sendMessage
return sendMessage(session, body);
}
// 发送给指定用户所有端的方法
public List<ClientInfo> sendToUser(String toId,
Command command,
Object data,
Integer appId) {
// 获取到用户的所有在线UserSession
List<UserSession> userSession = userSessionUtils.getUserSession(appId, toId);
List<ClientInfo> list = new ArrayList<>();
for (UserSession session : userSession) {
// 调用 sendPack 方法, 发送消息
boolean b = sendPack(toId, command, data, session);
if (b) {
list.add(new ClientInfo(session.getAppId(), session.getClientType(), session.getImei()));
}
}
return list;
}
// 发送给某个用户的指定客户端
public void sendToUser(String toId,
Command command,
Object data,
ClientInfo clientInfo) {
// 获取指定用户在指定端的session
UserSession userSession = userSessionUtils.getUserSession(
clientInfo.getAppId(),
toId,
clientInfo.getClientType(),
clientInfo.getImei()
);
sendPack(toId, command, data, userSession);
}
//发送给除了某一端的其他端(发送的时候,注意1个用户可能登录了多个端,那么消息就需要同步给除了当前客户端的其它端)
public void sendToUserExceptClient(String toId,
Command command,
Object data,
ClientInfo clientInfo) {
// 获取用户所有的在线session
List<UserSession> userSession = userSessionUtils.getUserSession(clientInfo.getAppId(), toId);
for (UserSession session : userSession) {
// 除了指定的客户端
if (!isMatch(session, clientInfo)) {
sendPack(toId, command, data, session);
}
}
}
private boolean isMatch(UserSession sessionDto, ClientInfo clientInfo) {
// appId、clientType、imei号 相同, 则认为匹配
return Objects.equals(sessionDto.getAppId(), clientInfo.getAppId())
&& Objects.equals(sessionDto.getClientType(), clientInfo.getClientType())
&& Objects.equals(sessionDto.getImei(), clientInfo.getImei());
}
// 这里做个兼容, 数据同步有可能是某个客户端发起的, 也可能是后台管理员发起的
public void sendToUser(String toId,
Integer clientType,
String imei,
Command command,
Object data,
Integer appId) {
// 如果 clientType 和 imei号 都不为空, 那就发给 指定用户的除了这个客户端的其它客户端
if (clientType != null && StringUtils.isNotBlank(imei)) {
ClientInfo clientInfo = new ClientInfo(appId, clientType, imei);
sendToUserExceptClient(toId, command, data, clientInfo);
} else {
// 否则, 发给指定用户的所有客户端
// (后台管理员调用时, 肯定没有imei号, 那就需要发给所有的客户端)
sendToUser(toId, command, data, appId);
}
}
}
逻辑层GroupMessageProducer
@Component
public class GroupMessageProducer {
@Autowired
MessageProducer messageProducer;
@Autowired
ImGroupMemberService imGroupMemberService;
public void producer(String userId, Command command, Object data, ClientInfo clientInfo) {
// 先从data中获取 groupId
JSONObject jsonObject = (JSONObject) JSONObject.toJSON(data);
String groupId = jsonObject.getString("groupId");
// 获取组内的所有成员
List<String> groupMemberId = imGroupMemberService.getGroupMemberId(groupId, clientInfo.getAppId());
if (command.equals(GroupEventCommand.ADDED_MEMBER)) {
// 添加群成员的指令
// 需要发送给管理员和被加入人本身
// 获取群内的 管理员 和 群主
List<GroupMemberDto> groupManager = imGroupMemberService.getGroupManager(groupId, clientInfo.getAppId());
// 获取 添加 的 所有群成员id
AddGroupMemberPack addGroupMemberPack = jsonObject.toJavaObject(AddGroupMemberPack.class);
List<String> members = addGroupMemberPack.getMembers();
// 发送给所有的 群管理员 和 群主
for (GroupMemberDto groupMemberDto : groupManager) {
// 这里的userId是操作人
// (说明: 假设是管理员邀请人进群, 那么发起此邀请请求的管理员的客户端, 就不用收到这个消息了,
// 因此要发送给这个管理员的除了这个客户端的其它客户端)
if (clientInfo.getClientType() != ClientType.WEBAPI.getCode() && groupMemberDto.getMemberId().equals(userId)) {
messageProducer.sendToUserExceptClient(groupMemberDto.getMemberId(), command, data, clientInfo);
} else {
// 如果userId这个操作人 不是 当前正在遍历的 群管理员, 则将消息发给 当前正在遍历的 群管理员 的所有客户端
messageProducer.sendToUser(groupMemberDto.getMemberId(), command, data, clientInfo.getAppId());
}
}
// 发送给所有的群成员
for (String member : members) {
// 这里的userId是操作人
// (说明: 假设是群成员邀请人进群, 那么发起此邀请请求的成员的客户端, 就不用收到这个消息了,
// 因此要发送给这个群员的除了这个客户端的其它客户端)
if (clientInfo.getClientType() != ClientType.WEBAPI.getCode() && member.equals(userId)) {
messageProducer.sendToUserExceptClient(member, command, data, clientInfo);
} else {
// 如果userId这个操作人 不是 当前正在遍历的 群成员, 则将消息发给 当前正在遍历的 群管理员 的所有客户端
messageProducer.sendToUser(member, command, data, clientInfo.getAppId());
}
}
} else if (command.equals(GroupEventCommand.DELETED_MEMBER)) {
// 群内踢人时, 需要发送消息给被踢的人
RemoveGroupMemberPack pack = jsonObject.toJavaObject(RemoveGroupMemberPack.class);
// 被踢的人
String member = pack.getMember();
// 群内成员(肯定不包含当前已经被踢掉的人)
List<String> members = imGroupMemberService.getGroupMemberId(groupId, clientInfo.getAppId());
// 被踢掉的人
members.add(member);
// 发送给所有群成员
for (String memberId : members) {
// 这里的userId是操作人
// (说明: 假设当前遍历的人发起此踢人请求, 那 发起这个踢人请求的客户端就不用收到这个消息了,
// 因此要发送给发起这个踢人请求的人除了这个客户端的其它客户端)
if (clientInfo.getClientType() != ClientType.WEBAPI.getCode() && member.equals(userId)) {
messageProducer.sendToUserExceptClient(memberId, command, data, clientInfo);
} else {
// 发送给所有的人
messageProducer.sendToUser(memberId, command, data, clientInfo.getAppId());
}
}
} else if (command.equals(GroupEventCommand.UPDATED_MEMBER)) {
// 更新群成员的指令
UpdateGroupMemberPack pack = jsonObject.toJavaObject(UpdateGroupMemberPack.class);
String memberId = pack.getMemberId();
// 被更新的群成员
GroupMemberDto groupMemberDto = new GroupMemberDto();
groupMemberDto.setMemberId(memberId);
// 获取群管理员
List<GroupMemberDto> groupManager = imGroupMemberService.getGroupManager(groupId, clientInfo.getAppId());
// 群管理员 + 被更新的群成员
groupManager.add(groupMemberDto);
// 发给 群管理员 + 被更新的群成员
for (GroupMemberDto member : groupManager) {
// userId是操作人, 谁去更新的, 那就不要发给这个人的这个客户端了, 要发给这个操作人的其它客户端
if (clientInfo.getClientType() != ClientType.WEBAPI.getCode() && member.equals(userId)) {
messageProducer.sendToUserExceptClient(member.getMemberId(), command, data, clientInfo);
} else {
// 发给其它人的所有客户端
messageProducer.sendToUser(member.getMemberId(), command, data, clientInfo.getAppId());
}
}
} else {
// 发送给所有群成员
// 遍历组内所有成员
for (String memberId : groupMemberId) {
if (clientInfo.getClientType() != null && clientInfo.getClientType() != ClientType.WEBAPI.getCode() && memberId.equals(userId)) {
// 发送给 指定用户 除了指定客户端的其它客户端
messageProducer.sendToUserExceptClient(memberId, command, data, clientInfo);
} else {
// 发送给 指定用户 的 所有客户端
messageProducer.sendToUser(memberId, command, data, clientInfo.getAppId());
}
}
}
}
}
tcp层MessageReceiver
启动的时候, 就会传入1个brokerId, 到时候每个启动的tcp服务实例, 都会去创建各自的1个队列, 并且监听这个队列,并且创建的这个队列都与 messageService2Pipeline 交换机绑定, 当向这个 messageService2Pipeline 交换机投递消息时, 并携带的的路由key是brokerId, 都会路由到不同的队列中,这样就可以将消息路由到指定的tcp服务实例了(这里主要用来确定是哪个tcp服务实例与客户端建立了连接)
@Slf4j
public class MessageReceiver {
private static String brokerId;
private static void startReceiverMessage() {
try {
// 创建channel 名字是: messageService2Pipeline{brokerId}
Channel channel = MqFactory.getChannel(Constants.RabbitConstants.MessageService2Im + brokerId);
// 使用channel声明队列 队列是: messageService2Pipeline{brokerId}
channel.queueDeclare(Constants.RabbitConstants.MessageService2Im + brokerId, true, false, false, null);
// 使用channel声明绑定关系 绑定是: 队列->messageService2Pipeline{brokerId} 交换机->messageService2Pipeline 路由key->{brokerId}
channel.queueBind(Constants.RabbitConstants.MessageService2Im + brokerId, Constants.RabbitConstants.MessageService2Im, brokerId);
// 监听这个队列中的消息: messageService2Pipeline{brokerId}
channel.basicConsume(Constants.RabbitConstants.MessageService2Im + brokerId,
// 不自动确认
false,
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
String msgStr = new String(body);
log.info(msgStr);
MessagePack messagePack = JSONObject.parseObject(msgStr, MessagePack.class);
BaseProcess messageProcess = ProcessFactory.getMessageProcess(messagePack.getCommand());
messageProcess.process(messagePack);
// 手动确认消息, 不批量
channel.basicAck(envelope.getDeliveryTag(),false);
}catch (Exception e){
e.printStackTrace();
// 不确认消息, 不批量, 不重回队列
channel.basicNack(envelope.getDeliveryTag(),false,false);
}
}
}
);
} catch (Exception e) {
}
}
public static void init() {
startReceiverMessage();
}
/* 启动的时候, 就会传入1个brokerId, 到时候每个启动的tcp服务实例, 都会去创建各自的1个队列, 并且监听这个队列
并且创建的这个队列都与 messageService2Pipeline 交换机绑定,
当向这个 messageService2Pipeline 交换机投递消息时, 并携带的的路由key是brokerId, 都会路由到不同的队列中,
这样就可以将消息路由到指定的tcp服务实例了(这里主要用来确定是哪个tcp服务实例与客户端建立了连接)*/
public static void init(String brokerId) {
if (StringUtils.isBlank(MessageReceiver.brokerId)) {
MessageReceiver.brokerId = brokerId;
}
startReceiverMessage();
}
}
MqFactory
需要引入com.rabbitmq / amqp-client / 5.6.0
依赖
public class MqFactory {
private static ConnectionFactory factory = null;
private static Channel defaultChannel;
private static ConcurrentHashMap<String,Channel> channelMap = new ConcurrentHashMap<>();
private static Connection getConnection() throws IOException, TimeoutException {
Connection connection = factory.newConnection();
return connection;
}
public static Channel getChannel(String channelName) throws IOException, TimeoutException {
Channel channel = channelMap.get(channelName);
if(channel == null){
channel = getConnection().createChannel();
channelMap.put(channelName,channel);
}
return channel;
}
public static void init(BootstrapConfig.Rabbitmq rabbitmq){
if(factory == null){
factory = new ConnectionFactory();
factory.setHost(rabbitmq.getHost());
factory.setPort(rabbitmq.getPort());
factory.setUsername(rabbitmq.getUserName());
factory.setPassword(rabbitmq.getPassword());
factory.setVirtualHost(rabbitmq.getVirtualHost());
}
}
}
ProcessFactory
在MessageReceiver中使用了ProcessFactory根据传递过来的command指令找到对应的BaseProcess。这里面也可以根据不同的command返回不同的BaseProcess(可扩展自BaseProcess的子类),来处理这个消息
public class ProcessFactory {
private static BaseProcess defaultProcess;
static {
defaultProcess = new BaseProcess() {
@Override
public void processBefore() {
}
@Override
public void processAfter() {
}
};
}
public static BaseProcess getMessageProcess(Integer command) {
return defaultProcess;
}
}
BaseProcess
public abstract class BaseProcess {
public abstract void processBefore();
public void process(MessagePack messagePack){
processBefore();
// 从SessionSocketHolder中拿到对应的channel
NioSocketChannel channel = SessionSocketHolder.get(
messagePack.getAppId(),
messagePack.getToId(),
messagePack.getClientType(),
messagePack.getImei()
);
if(channel != null){
channel.writeAndFlush(messagePack);
}
processAfter();
}
public abstract void processAfter();
}
TCP通知
其实就是在做完业务后,发送消息到
用户资料变更通知
@Override
@Transactional
public ResponseVO modifyUserInfo(ModifyUserInfoReq req) {
QueryWrapper query = new QueryWrapper<>();
query.eq("app_id", req.getAppId());
query.eq("user_id", req.getUserId());
query.eq("del_flag", DelFlagEnum.NORMAL.getCode());
ImUserDataEntity user = imUserDataMapper.selectOne(query);
if (user == null) {
throw new ApplicationException(UserErrorCode.USER_IS_NOT_EXIST);
}
ImUserDataEntity update = new ImUserDataEntity();
BeanUtils.copyProperties(req, update);
update.setAppId(null);
update.setUserId(null);
int update1 = imUserDataMapper.update(update, query);
if (update1 == 1) {
UserModifyPack pack = new UserModifyPack();
BeanUtils.copyProperties(req, pack);
// 由用户发起的请求会携带clientType和imei号(不需要通知给发起请求的当前用户的客户端),
// 而由后台发起的请求不会携带imei号(需要发送给所有的客户端),
messageProducer.sendToUser(
req.getUserId(),
req.getClientType(),
req.getImei(),
UserEventCommand.USER_MODIFY,
pack,
req.getAppId()
);
// 用户资料变更之后回调开关 是否开启
// 如果开启了的话, 在变更完用户信息成功之后, 向指定的url发起回调请求
if (appConfig.isModifyUserAfterCallback()) {
callbackService.callback(
req.getAppId(),
Constants.CallbackCommand.ModifyUserAfter, // user.modify.after
JSONObject.toJSONString(req)
);
}
return ResponseVO.successResponse();
}
throw new ApplicationException(UserErrorCode.MODIFY_USER_ERROR);
}
好友模块TCP通知
略
群组模块TCP通知
略