0
点赞
收藏
分享

微信扫一扫

【Spring Cloud】新闻头条微服务项目:实时创建ES索引并引入MongoDB实现搜索历史记录及关键词联想

8420b26844034fab91b6df661ae68671.png

个人简介: 

前言:

一:文章自动构建索引

1.需求分析

        在上一篇文章中我们完成了利用Elastic Search来对文章进行检索,在一开始时候我们先是创建了一个映射表,然后就是对以前提交的文章进行批量创建索引。但是对于后面发布的文章我们就不应该再像前面那样批量创建索引了,会存在一个时间差问题,我们应该针对发布的文章实时创建一个ES索引。

2.实现思路

        实现思路是在文章微服务将审核成功的文章保存之前就将其装入消息队列中,然后让搜索微服务进行消息的拉取并消费。为什么不选择直接在文章审核成功之后在自媒体端就将消息放入队列呢?要知道我们创建的映射表中是有文章静态URL(即MinIO中的路径)的,这个URL只有在文章微服务中成功将文章保存值MinIO中才能生成,因此需要在文章微服务中进行消息的发送。 

3.代码实现

①在ArticleFreemarkerServiceImpl中改造代码如下:

/**
 * 异步生成静态页面到Minio
 * @param apArticle
 * @param apArticleContent
 */
@Override
@Async
public void buildArticleToMinIO(ApArticle apArticle, String apArticleContent) {
    StringWriter out = new StringWriter();
    if(StringUtils.isNotBlank(apArticleContent)){
        //3.修改ap_article表,保存static_url字段
        ApArticle article = new ApArticle();
        article.setId(apArticle.getId());
        article.setStaticUrl(path);
        apArticleMapper.updateById(article);

        //4.创建索引,发送消息
        createArticleESIndex(article,apArticleContent,path);
    }
}

@Autowired
private KafkaTemplate<String,String> kafkaTemplate;

/**
 * 创建索引,发送消息
 * @param article
 * @param apArticleContent
 * @param path
 */
private void createArticleESIndex(ApArticle article, String apArticleContent, String path) {
    SearchArticleVo searchArticleVo = new SearchArticleVo();
    BeanUtils.copyProperties(apArticleContent,searchArticleVo);
    searchArticleVo.setContent(apArticleContent);
    searchArticleVo.setStaticUrl(path);

    //发送消息
    kafkaTemplate.send(ArticleConstas.ARTICLE_ES_SYNC_TOPIC, JSON.toJSONString(searchArticleVo));
}

②在文章微服务的nacos的配置中心添加如下配置

kafka:
    bootstrap-servers: 12.8.10.10:9092
    producer:
      retries: 10
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

③搜索微服务中添加kafka的配置,nacos配置如下

spring:
  kafka:
    bootstrap-servers: 19.1.20.30:9092
    consumer:
      group-id: ${spring.application.name}
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

④定义监听队列进行消息的消费

package com.my.search.listener;

import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.toolkit.StringUtils;
import com.my.common.constans.ArticleConstas;
import com.my.model.search.vos.SearchArticleVo;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.io.IOException;


@Slf4j
@Component
public class SyncArticleListener {
    @Autowired
    private RestHighLevelClient restHighLevelClient;

    @KafkaListener(topics = ArticleConstas.ARTICLE_ES_SYNC_TOPIC)
    public void onMessage(String message) {
        if(StringUtils.isNotBlank(message)) {
            log.info("接收到消息:{}",message);

            SearchArticleVo vo = JSON.parseObject(message, SearchArticleVo.class);
            IndexRequest indexRequest = new IndexRequest("app_info_article");
            indexRequest.id(vo.getId().toString());
            indexRequest.source(message, XContentType.JSON);
            try {
                restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

二:搜索记录

1.需求分析

        在移动端进行搜索时候,我们应该记录每个用户的搜索记录,并且还要进行回显。这时候你可能会想到用MySQL对每个用户都建立一张搜索记录表,当用户量不大时候这样做确实可以,但是随着用户量的增加,就需要创建成千上万张表,而且我们一般只记录用户的有限条搜索记录,这样就会存在表很多但是每张表存储的数据很少的情况,容易造成资源浪费,而且效率不高,维护困难,这时候用MongoDB进行存储就比较合适。

2.实现思路

        前面我们在用户输入关键词进行搜索时可以发送一个异步请求将搜索关键字进行保存,保存之前需要判断该搜索记录是否存在,若存在则更新搜索时间,否则还要判断数据量是否超过10条,逻辑如下图所示:

3.代码实现

在tbug-headlines-search中新增ApUserSearchService并增加insert方法

@Autowired
private MongoTemplate mongoTemplate;
/**
 * 保存搜索记录
 * @param keyWord
 * @param userId
 */
@Override
@Async
public void insert(String keyWord, Integer userId) {
    log.info("保存搜索历史...");
    //1.查询当前用户搜索关键词
    Query query = Query.query(Criteria.where("userId").is(userId).and("keyword").is(keyWord));
    ApUserSearch apUserSearch = mongoTemplate.findOne(query,ApUserSearch.class);

    //2.存在该关键词,更新时间
    if(apUserSearch != null) {
        log.info("关键词{}已存在,更新时间",keyWord);
        apUserSearch.setCreatedTime(new Date());
        mongoTemplate.save(apUserSearch);
        return;
    }

    //3.不存在关键词
    log.info("不存在该关键词");
    apUserSearch = new ApUserSearch();
    apUserSearch.setUserId(userId);
    apUserSearch.setKeyword(keyWord);
    apUserSearch.setCreatedTime(new Date());

    Query query1 = Query.query(Criteria.where("userId").is(userId));
    query1.with(Sort.by(Sort.Direction.DESC,"createdTime"));

    List<ApUserSearch> userSearchList = mongoTemplate.find(query1, ApUserSearch.class);
    //3.1搜索记录条数不大于10
    if(userSearchList == null || userSearchList.size() < 10) {
        log.info("搜索记录数不大于10,保存关键词{}",keyWord);
        mongoTemplate.save(apUserSearch);
    } else {
        //3.2搜索记录条数大于10
        //3.2.1获取时间最旧的数据
        ApUserSearch apUserSearch1 = userSearchList.get(userSearchList.size() - 1);
        log.info("关键词{}替换成{}",apUserSearch1.getKeyword(),keyWord);
        //3.2.2替换最后一条记录
        mongoTemplate.findAndReplace(Query.query(Criteria.where("id").is(apUserSearch1.getId())),apUserSearch);
    }

}

在ArticleSearchServiceImpl的search方法中增加异步调用方法:

//异步调用保存历史记录
ApUser user = AppThreadLocalUtils.getUser();
//用户信息不为空并且为首页搜索才进行保存
if(user != null && dto.getFromIndex() == 0) {
    apUserSearchService.insert(dto.getSearchWords(),user.getId());
}

回显搜索历史记录

/**
 * 加载历史记录
 * @return
 */
@Override
public ResponseResult loadHistory() {
    log.info("加载用户历史搜索记录...");
    //1.获取当前用户id
    ApUser user = AppThreadLocalUtils.getUser();
    if(user != null) {
        Integer userId = user.getId();

        //2.查询该用户历史搜索记录
        Query query = Query.query(Criteria.where("userId").is(userId));
        query.with(Sort.by(Sort.Direction.DESC,"createdTime"));
        List<ApUserSearch> searchList = mongoTemplate.find(query, ApUserSearch.class);
        
        //3.返回结果
        return ResponseResult.okResult(searchList);
    } else {
        return ResponseResult.errorResult(AppHttpCodeEnum.NEED_LOGIN);
    }
}

至于删除历史搜索记录这里就不放代码了,可以自己尝试实现一下。

三:关键词联想

1.需求分析

相信大家都有用过百度搜索,当我们输入关键字之后会有搜索提示,如下图:

 而我们也要在移动端实现这个效果。

2.实现思路

        其实能够实现关键字的联想,主要还是要靠维护一个搜索词数据库,当用户在搜索框输入关键字时候就会不断发送请求到该数据库进行查询并返回,数据库中存储的一般是搜索频率比较高的一些词,通常在企业中有两部分来源:

  • 自己维护搜索词,通过分析用户搜索频率较高的词,按照排名作为搜索词
  • 第三方获取,关键词规划师(百度)、5118、爱站网

3. 代码实现

前面说到当检测到用户输入的关键字有发生变化时候就会发送查询请求,而我们需要快速响应这个请求,因此使用MongoDB比较合适,其接口定义如下:

说明
接口路径/api/v1/associate/search
请求方式POST
参数UserSearchDto
响应结果ResponseResult

新增ApAssociateWordsService并增加如下方法:

@Autowired
private MongoTemplate mongoTemplate;

/**
 * 获取联想词
 * @param dto
 * @return
 */
@Override
public ResponseResult getAssociationWord(UserSearchDto dto) {
    if(dto == null || StringUtils.isBlank(dto.getSearchWords())) {
        return  ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
    }

    //分页查询
    if(dto.getPageSize() > 20) {
        dto.setPageSize(20);
    }

    //模糊查询
    Query query = Query.query(Criteria.where("associateWords").regex(".*\\" + dto.getSearchWords() + ".*"));
    query.limit(dto.getPageSize());
    List<ApAssociateWords> apAssociateWords = mongoTemplate.find(query, ApAssociateWords.class);
    return ResponseResult.okResult(apAssociateWords);
}

下篇预告:Kafka Stream实现文章热度实时计算

 友情链接: 牛客网  刷题|面试|找工作神器

举报

相关推荐

0 条评论