0
点赞
收藏
分享

微信扫一扫

Introduction to Data Mining 数据挖掘

北冥有一鲲 18小时前 阅读 2
javanetty

文章目录

处理多端数据同步

由于im服务支持用户在多个客户端登录,那么就会存在一个用户在多个端的数据同步问题。

  1. 轮询拉取:两台设备每隔一段时间就去拉取,这样做性能有很大的浪费
  2. 业务回调:在业务系统执行完业务操作时,调用tcp服务的接口,来通知用户的所有客户端去更新数据,但这样做增强了2个服务之间的依赖程度
  3. TCP通知:到收到好友请求后,并且处理成功后,就主动的去发送特定的指令给这个用户的其他端,并且将新添加的好友信息也附带过去,这时候收到这条特色树消息的用户,不需要请求服务器,只是根据这条消息进行更本地好友列表即可,这样既解决了空轮训,也解决了和业务系统强依赖的问题

准备工作

RedisConfig

注意下hashValue的redis序列化器

@Configuration
public class RedisConfig {

    @Autowired
    RedisConnectionFactory redisConnectionFactory;

    @Bean
    public RedisTemplate<Object, Object> redisTemplate() {
        RedisTemplate<Object, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(redisConnectionFactory);

        //使用Jackson2JsonRedisSerializer来序列化和反序列化redis的value值
        Jackson2JsonRedisSerializer serializer = new Jackson2JsonRedisSerializer(Object.class);

        ObjectMapper mapper = new ObjectMapper();
        mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        serializer.setObjectMapper(mapper);

        template.setValueSerializer(new StringRedisSerializer());
        //使用StringRedisSerializer来序列化和反序列化redis的key值
        template.setKeySerializer(new StringRedisSerializer());
        template.afterPropertiesSet();
        template.setHashKeySerializer(new StringRedisSerializer());
        
        template.setHashValueSerializer(serializer);

        return template;
    }
}

UserSessionUtils

在tcp服务的NettyServerHandler中登录成功时,会使用redisson向redis中写入会话对象,这里根据appId、userId将用户取出(注意反序列化)

@Component
public class UserSessionUtils {

    @Autowired
    StringRedisTemplate stringRedisTemplate;

    // session的维护: {appId}:userSession:{userId} -> {clientType}:{imei} - {{userSession}}
    //                                               {clientType}:{imei} - {{userSession}}

    // 获取用户所有的在线session
    public List<UserSession> getUserSession(Integer appId, String userId) {

        String userSessionKey = appId + Constants.RedisConstants.UserSessionConstants + userId;
        Map<Object, Object> entries = stringRedisTemplate.opsForHash().entries(userSessionKey);
        List<UserSession> list = new ArrayList<>();
        Collection<Object> values = entries.values();
        for (Object o : values) {
            String sessionStr = (String) o;
            UserSession session = JSONObject.parseObject(sessionStr, UserSession.class);
            // 仅筛选在线状态的session
            if (ImConnectStatusEnum.ONLINE_STATUS.getCode().equals(session.getConnectState())) {
                list.add(session);
            }
        }
        return list;
    }

    // 获取指定用户在指定端的session
    public UserSession getUserSession(Integer appId, String userId, Integer clientType, String imei) {
        String userSessionKey = appId + Constants.RedisConstants.UserSessionConstants + userId;
        String hashKey = clientType + ":" + imei;
        Object o = stringRedisTemplate.opsForHash().get(userSessionKey, hashKey);
        UserSession session = JSONObject.parseObject(o.toString(), UserSession.class);
        return session;
    }


}

逻辑层MessageProducer

逻辑层不能直接发消息给客户端,逻辑层需要通过通知tcp服务来发送消息给相应的客户端。逻辑层将消息先发送到rabbitmq,tcp层则在rabbitmq中订阅主题,绑定监听。

发送的时候,注意1个用户可能登录了多个端,那么消息就需要同步给除了当前客户端的其它端。

@Service
public class MessageProducer {

    private static Logger logger = LoggerFactory.getLogger(MessageProducer.class);

    @Autowired
    RabbitTemplate rabbitTemplate;

    @Autowired
    UserSessionUtils userSessionUtils;

    private boolean sendMessage(UserSession session, Object msg) {

        try {

            logger.info("send message == " + msg);

            // (具体消费这条消息的逻辑可以查看 MessageReceiver 这个类
            //      1. 每个tcp服务在启动的时候, 都会声明自己的1个队列: messageService2Pipeline{brokerId},
            //      2. 并创建1个绑定关系是: 队列    -> messageService2Pipeline{brokerId}
            //                           交换机  -> messageService2Pipeline
            //                           路由key -> {brokerId}
            //  所以每个tcp服务都会有自己的1个队列, 来绑定 messageService2Pipeline 这个交换机。
            //  当携带不同的路由key发送到这个交换机时, 就会路由到不同的tcp服务处理,
            //  而用户在登录的时候, 创建的session对象保存了它所登录的服务的brokerId,
            //  这样, 就可以精确的让存有该session对象对应的channel的tcp服务消费该条消息, 并发送到对应的客户端了
            // )
            rabbitTemplate.convertAndSend(
                    // 将消息发送到名为 messageService2Pipeline 的交换机
                    Constants.RabbitConstants.MessageService2Im,
                    // 携带的路由key: brokerId
                    session.getBrokerId() + "",
                    // MessagePack对象 转为的 json字符串
                    msg
            );

            // todo rabbitmq发布消息及rabbitmq的配置
            return true;

        } catch (Exception e) {
            logger.error("send error :" + e.getMessage());
            return false;
        }
    }

    // sendPack方法 调用上面的 sendMessage 方法
    // MessagePack 是消息服务发送给tcp服务的包体, tcp服务 再根据改包体解析成 Message 发给客户端
    public boolean sendPack(String toId, Command command, Object msg, UserSession session) {

        // 将 msg 转为 JSONObject对象
        // todo fastjson转换java对象的方式
        JSONObject jsonObject = JSONObject.parseObject(JSONObject.toJSONString(msg));

        MessagePack messagePack = new MessagePack();

        messagePack.setAppId(session.getAppId());
        // session里面不也是有userId的么?这里干嘛还使用传入的toId
        messagePack.setToId(toId);
        messagePack.setClientType(session.getClientType());
        messagePack.setImei(session.getImei());

        // 指定操作指令
        messagePack.setCommand(command.getCommand());

        // 使用 MessagePack 将 msg转为JSONObject的对象 设置到data属性中
        messagePack.setData(jsonObject);

        // 将 MessagePack对象 转为 json字符串
        String body = JSONObject.toJSONString(messagePack);

        // 最终调用 sendMessage
        return sendMessage(session, body);
    }

    // 发送给指定用户所有端的方法
    public List<ClientInfo> sendToUser(String toId,
                                       Command command,
                                       Object data,
                                       Integer appId) {

        // 获取到用户的所有在线UserSession
        List<UserSession> userSession = userSessionUtils.getUserSession(appId, toId);

        List<ClientInfo> list = new ArrayList<>();

        for (UserSession session : userSession) {

            // 调用 sendPack 方法, 发送消息
            boolean b = sendPack(toId, command, data, session);

            if (b) {
                list.add(new ClientInfo(session.getAppId(), session.getClientType(), session.getImei()));
            }
        }
        return list;
    }

    // 发送给某个用户的指定客户端
    public void sendToUser(String toId,
                           Command command,
                           Object data,
                           ClientInfo clientInfo) {

        // 获取指定用户在指定端的session
        UserSession userSession = userSessionUtils.getUserSession(
                clientInfo.getAppId(),
                toId,
                clientInfo.getClientType(),
                clientInfo.getImei()
        );
        sendPack(toId, command, data, userSession);
    }

    //发送给除了某一端的其他端(发送的时候,注意1个用户可能登录了多个端,那么消息就需要同步给除了当前客户端的其它端)
    public void sendToUserExceptClient(String toId,
                                       Command command,
                                       Object data,
                                       ClientInfo clientInfo) {
        // 获取用户所有的在线session
        List<UserSession> userSession = userSessionUtils.getUserSession(clientInfo.getAppId(), toId);

        for (UserSession session : userSession) {

            // 除了指定的客户端
            if (!isMatch(session, clientInfo)) {
                sendPack(toId, command, data, session);
            }
        }
    }


    private boolean isMatch(UserSession sessionDto, ClientInfo clientInfo) {
        // appId、clientType、imei号 相同, 则认为匹配
        return Objects.equals(sessionDto.getAppId(), clientInfo.getAppId())
                && Objects.equals(sessionDto.getClientType(), clientInfo.getClientType())
                && Objects.equals(sessionDto.getImei(), clientInfo.getImei());
    }

    // 这里做个兼容, 数据同步有可能是某个客户端发起的, 也可能是后台管理员发起的
    public void sendToUser(String toId,
                           Integer clientType,
                           String imei,
                           Command command,
                           Object data,
                           Integer appId) {
        // 如果 clientType 和 imei号 都不为空, 那就发给 指定用户的除了这个客户端的其它客户端
        if (clientType != null && StringUtils.isNotBlank(imei)) {
            ClientInfo clientInfo = new ClientInfo(appId, clientType, imei);
            sendToUserExceptClient(toId, command, data, clientInfo);
        } else {
            // 否则, 发给指定用户的所有客户端
            // (后台管理员调用时, 肯定没有imei号, 那就需要发给所有的客户端)
            sendToUser(toId, command, data, appId);
        }
    }


}

逻辑层GroupMessageProducer

@Component
public class GroupMessageProducer {

    @Autowired
    MessageProducer messageProducer;

    @Autowired
    ImGroupMemberService imGroupMemberService;

    public void producer(String userId, Command command, Object data, ClientInfo clientInfo) {

        // 先从data中获取 groupId
        JSONObject jsonObject = (JSONObject) JSONObject.toJSON(data);
        String groupId = jsonObject.getString("groupId");

        // 获取组内的所有成员
        List<String> groupMemberId = imGroupMemberService.getGroupMemberId(groupId, clientInfo.getAppId());

        if (command.equals(GroupEventCommand.ADDED_MEMBER)) {

            // 添加群成员的指令
            // 需要发送给管理员和被加入人本身

            // 获取群内的 管理员 和 群主
            List<GroupMemberDto> groupManager = imGroupMemberService.getGroupManager(groupId, clientInfo.getAppId());

            // 获取 添加 的 所有群成员id
            AddGroupMemberPack addGroupMemberPack = jsonObject.toJavaObject(AddGroupMemberPack.class);
            List<String> members = addGroupMemberPack.getMembers();

            // 发送给所有的 群管理员 和 群主
            for (GroupMemberDto groupMemberDto : groupManager) {
                // 这里的userId是操作人
                // (说明: 假设是管理员邀请人进群, 那么发起此邀请请求的管理员的客户端, 就不用收到这个消息了,
                //        因此要发送给这个管理员的除了这个客户端的其它客户端)
                if (clientInfo.getClientType() != ClientType.WEBAPI.getCode() && groupMemberDto.getMemberId().equals(userId)) {
                    messageProducer.sendToUserExceptClient(groupMemberDto.getMemberId(), command, data, clientInfo);
                } else {
                    // 如果userId这个操作人 不是 当前正在遍历的 群管理员, 则将消息发给 当前正在遍历的 群管理员 的所有客户端
                    messageProducer.sendToUser(groupMemberDto.getMemberId(), command, data, clientInfo.getAppId());
                }
            }

            // 发送给所有的群成员
            for (String member : members) {
                // 这里的userId是操作人
                // (说明: 假设是群成员邀请人进群, 那么发起此邀请请求的成员的客户端, 就不用收到这个消息了,
                //        因此要发送给这个群员的除了这个客户端的其它客户端)
                if (clientInfo.getClientType() != ClientType.WEBAPI.getCode() && member.equals(userId)) {
                    messageProducer.sendToUserExceptClient(member, command, data, clientInfo);
                } else {
                    // 如果userId这个操作人 不是 当前正在遍历的 群成员, 则将消息发给 当前正在遍历的 群管理员 的所有客户端
                    messageProducer.sendToUser(member, command, data, clientInfo.getAppId());
                }
            }

        } else if (command.equals(GroupEventCommand.DELETED_MEMBER)) {

            // 群内踢人时, 需要发送消息给被踢的人

            RemoveGroupMemberPack pack = jsonObject.toJavaObject(RemoveGroupMemberPack.class);

            // 被踢的人
            String member = pack.getMember();

            // 群内成员(肯定不包含当前已经被踢掉的人)
            List<String> members = imGroupMemberService.getGroupMemberId(groupId, clientInfo.getAppId());

            // 被踢掉的人
            members.add(member);

            // 发送给所有群成员
            for (String memberId : members) {
                // 这里的userId是操作人
                // (说明: 假设当前遍历的人发起此踢人请求, 那 发起这个踢人请求的客户端就不用收到这个消息了,
                //        因此要发送给发起这个踢人请求的人除了这个客户端的其它客户端)
                if (clientInfo.getClientType() != ClientType.WEBAPI.getCode() && member.equals(userId)) {
                    messageProducer.sendToUserExceptClient(memberId, command, data, clientInfo);
                } else {
                    // 发送给所有的人
                    messageProducer.sendToUser(memberId, command, data, clientInfo.getAppId());
                }
            }

        } else if (command.equals(GroupEventCommand.UPDATED_MEMBER)) {

            // 更新群成员的指令

            UpdateGroupMemberPack pack = jsonObject.toJavaObject(UpdateGroupMemberPack.class);
            String memberId = pack.getMemberId();

            // 被更新的群成员
            GroupMemberDto groupMemberDto = new GroupMemberDto();
            groupMemberDto.setMemberId(memberId);

            // 获取群管理员
            List<GroupMemberDto> groupManager = imGroupMemberService.getGroupManager(groupId, clientInfo.getAppId());

            // 群管理员 + 被更新的群成员
            groupManager.add(groupMemberDto);

            // 发给 群管理员 + 被更新的群成员
            for (GroupMemberDto member : groupManager) {
                // userId是操作人, 谁去更新的, 那就不要发给这个人的这个客户端了, 要发给这个操作人的其它客户端
                if (clientInfo.getClientType() != ClientType.WEBAPI.getCode() && member.equals(userId)) {
                    messageProducer.sendToUserExceptClient(member.getMemberId(), command, data, clientInfo);
                } else {
                    // 发给其它人的所有客户端
                    messageProducer.sendToUser(member.getMemberId(), command, data, clientInfo.getAppId());
                }
            }
        } else {

            // 发送给所有群成员

            // 遍历组内所有成员
            for (String memberId : groupMemberId) {
                if (clientInfo.getClientType() != null && clientInfo.getClientType() != ClientType.WEBAPI.getCode() && memberId.equals(userId)) {
                    // 发送给 指定用户 除了指定客户端的其它客户端
                    messageProducer.sendToUserExceptClient(memberId, command, data, clientInfo);
                } else {
                    // 发送给 指定用户 的 所有客户端
                    messageProducer.sendToUser(memberId, command, data, clientInfo.getAppId());
                }
            }
        }


    }

}

tcp层MessageReceiver

启动的时候, 就会传入1个brokerId, 到时候每个启动的tcp服务实例, 都会去创建各自的1个队列, 并且监听这个队列,并且创建的这个队列都与 messageService2Pipeline 交换机绑定, 当向这个 messageService2Pipeline 交换机投递消息时, 并携带的的路由key是brokerId, 都会路由到不同的队列中,这样就可以将消息路由到指定的tcp服务实例了(这里主要用来确定是哪个tcp服务实例与客户端建立了连接)

@Slf4j
public class MessageReceiver {

    private static String brokerId;

    private static void startReceiverMessage() {

        try {

            // 创建channel           名字是: messageService2Pipeline{brokerId}
            Channel channel = MqFactory.getChannel(Constants.RabbitConstants.MessageService2Im + brokerId);

            // 使用channel声明队列     队列是: messageService2Pipeline{brokerId}
            channel.queueDeclare(Constants.RabbitConstants.MessageService2Im + brokerId, true, false, false, null);

            // 使用channel声明绑定关系  绑定是: 队列->messageService2Pipeline{brokerId}  交换机->messageService2Pipeline 路由key->{brokerId}
            channel.queueBind(Constants.RabbitConstants.MessageService2Im + brokerId, Constants.RabbitConstants.MessageService2Im, brokerId);

            // 监听这个队列中的消息: messageService2Pipeline{brokerId}
            channel.basicConsume(Constants.RabbitConstants.MessageService2Im + brokerId,

                    // 不自动确认
                    false,

                    new DefaultConsumer(channel) {

                        @Override
                        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                            try {

                                String msgStr = new String(body);

                                log.info(msgStr);

                                MessagePack messagePack = JSONObject.parseObject(msgStr, MessagePack.class);
                                BaseProcess messageProcess = ProcessFactory.getMessageProcess(messagePack.getCommand());
                                messageProcess.process(messagePack);

                                // 手动确认消息, 不批量
                                channel.basicAck(envelope.getDeliveryTag(),false);

                            }catch (Exception e){

                                e.printStackTrace();

                                // 不确认消息, 不批量, 不重回队列
                                channel.basicNack(envelope.getDeliveryTag(),false,false);
                            }
                        }
                    }
            );
        } catch (Exception e) {

        }
    }

    public static void init() {
        startReceiverMessage();
    }

    /* 启动的时候, 就会传入1个brokerId, 到时候每个启动的tcp服务实例, 都会去创建各自的1个队列, 并且监听这个队列
       并且创建的这个队列都与 messageService2Pipeline 交换机绑定,
       当向这个 messageService2Pipeline 交换机投递消息时, 并携带的的路由key是brokerId, 都会路由到不同的队列中,
       这样就可以将消息路由到指定的tcp服务实例了(这里主要用来确定是哪个tcp服务实例与客户端建立了连接)*/
    public static void init(String brokerId) {
        if (StringUtils.isBlank(MessageReceiver.brokerId)) {
            MessageReceiver.brokerId = brokerId;
        }
        startReceiverMessage();
    }
}

MqFactory

需要引入com.rabbitmq / amqp-client / 5.6.0依赖

public class MqFactory {

    private static ConnectionFactory factory = null;

    private static Channel defaultChannel;

    private static ConcurrentHashMap<String,Channel> channelMap = new ConcurrentHashMap<>();

    private static Connection getConnection() throws IOException, TimeoutException {
        Connection connection = factory.newConnection();
        return connection;
    }

    public static Channel getChannel(String channelName) throws IOException, TimeoutException {
        Channel channel = channelMap.get(channelName);
        if(channel == null){
            channel = getConnection().createChannel();
            channelMap.put(channelName,channel);
        }
        return channel;
    }

    public static void init(BootstrapConfig.Rabbitmq rabbitmq){
        if(factory == null){
            factory = new ConnectionFactory();
            factory.setHost(rabbitmq.getHost());
            factory.setPort(rabbitmq.getPort());
            factory.setUsername(rabbitmq.getUserName());
            factory.setPassword(rabbitmq.getPassword());
            factory.setVirtualHost(rabbitmq.getVirtualHost());
        }
    }

}

ProcessFactory

在MessageReceiver中使用了ProcessFactory根据传递过来的command指令找到对应的BaseProcess。这里面也可以根据不同的command返回不同的BaseProcess(可扩展自BaseProcess的子类),来处理这个消息

public class ProcessFactory {

    private static BaseProcess defaultProcess;

    static {
        defaultProcess = new BaseProcess() {
            @Override
            public void processBefore() {

            }

            @Override
            public void processAfter() {

            }
        };
    }

    public static BaseProcess getMessageProcess(Integer command) {
        return defaultProcess;
    }

}

BaseProcess
public abstract class BaseProcess {

    public abstract void processBefore();

    public void process(MessagePack messagePack){
        processBefore();

        // 从SessionSocketHolder中拿到对应的channel
        NioSocketChannel channel = SessionSocketHolder.get(
                messagePack.getAppId(),
                messagePack.getToId(),
                messagePack.getClientType(),
                messagePack.getImei()
        );
        if(channel != null){
            channel.writeAndFlush(messagePack);
        }
        processAfter();
    }

    public abstract void processAfter();

}

TCP通知

其实就是在做完业务后,发送消息到

用户资料变更通知

@Override
@Transactional
public ResponseVO modifyUserInfo(ModifyUserInfoReq req) {
    QueryWrapper query = new QueryWrapper<>();
    query.eq("app_id", req.getAppId());
    query.eq("user_id", req.getUserId());
    query.eq("del_flag", DelFlagEnum.NORMAL.getCode());
    ImUserDataEntity user = imUserDataMapper.selectOne(query);
    if (user == null) {
        throw new ApplicationException(UserErrorCode.USER_IS_NOT_EXIST);
    }

    ImUserDataEntity update = new ImUserDataEntity();
    BeanUtils.copyProperties(req, update);

    update.setAppId(null);
    update.setUserId(null);
    int update1 = imUserDataMapper.update(update, query);
    if (update1 == 1) {

        UserModifyPack pack = new UserModifyPack();
        BeanUtils.copyProperties(req, pack);

        // 由用户发起的请求会携带clientType和imei号(不需要通知给发起请求的当前用户的客户端),
        // 而由后台发起的请求不会携带imei号(需要发送给所有的客户端),
        messageProducer.sendToUser(
                req.getUserId(),
                req.getClientType(),
                req.getImei(),
                UserEventCommand.USER_MODIFY,
                pack,
                req.getAppId()
        );

        // 用户资料变更之后回调开关 是否开启
        // 如果开启了的话, 在变更完用户信息成功之后, 向指定的url发起回调请求
        if (appConfig.isModifyUserAfterCallback()) {
            callbackService.callback(
                    req.getAppId(),
                    Constants.CallbackCommand.ModifyUserAfter, // user.modify.after
                    JSONObject.toJSONString(req)
            );
        }
        return ResponseVO.successResponse();
    }
    throw new ApplicationException(UserErrorCode.MODIFY_USER_ERROR);
}

好友模块TCP通知

群组模块TCP通知

接口鉴权

举报

相关推荐

0 条评论