0
点赞
收藏
分享

微信扫一扫

6.Elasticsearch,分布式搜索引擎

minute_5 2022-04-02 阅读 40

文章目录


项目源码可以在 https://gitee.com/ShayneC/community获取

Elasticsearch,分布式搜索引擎

1. Elasticsearch入门

Elasticsearch简介

  • 一个分布式的、Restful风格(请求标准的描述)的搜索引擎。
    • 支持对各种类型的数据的检索。
    • 搜索速度快,可以提供实时的搜索服务。
    • 便于水平扩展,每秒可以处理PB级海量数据。

Elasticsearch术语

  • 索引(对应数据库)、类型(对应表)、文档(表里一行)、字段(一列)。 最新的版本类型被废弃。
  • 集群(服务器组合在一起)、节点(集群中每台服务器)、分片(对索引的划分)、副本(分片的备份)。

通过ES搜索的数据必须要在ES中转存一份,某种角度来说它是一个数据库。

Elasticsearch使用

  • 安装、修改配置文件

    • elasticsearch.yml文件,修改cluster.name,path.data,path.logs
    • 配置环境变量
  • 安装中文分词插件(ES仅支持中文分词)

    • ik插件安装到plugins文件夹下
  • 安装postman(提交html数据给ES)模拟web客户端

  • 启动ES:打开bin/elasticsearch.bat

    • 查看集群健康状态:curl -X GET “localhost:9200/_cat/health?v”
    • 查看节点:curl -X GET “localhost:9200/_cat/nodes?v”
    • 查看索引:curl -X GET “localhost:9200/_cat/indices?v”
    • 创建索引:curl -X PUT “localhost:9200/test”
    • 删除索引:curl -X DELETE “localhost:9200/test”
  • 使用postman查询

    • 提交数据,PUT localhost:9200/test/_doc/1选择Body,raw,JSON

    • 搜索,GET localhost:9200/test/_search?q=title(/content):xxx

    • 搜索时ES对关键词进行了分词

    • 通过请求体构造复杂搜索条件

2. Spring整合Elasticsearch

  • 引入依赖
    • spring-boot-starter-data-elasticsearch
  • 配置Elasticsearch
    • cluster-name、cluster-nodes(集群的名字,节点)
    • Redis和Es底层都用到了Netty,有启动冲突。解决:在CommunityApplication类加入初始化方法进行配置。
  • Spring Data Elasticsearch(调用API)
    • ElasticsearchTemplate(集成了Es的CRUD方法)
    • ElasticsearchRepository(接口,底层为ElasticsearchTemplate,用起来更方便)

我是使用的elasticsearch是7.15.2版本与6.x相差较大,但使用起来更容易,对于springboot集成elasticsearch只需要添加一个配置类就可以了,并舍弃了6.x版本配置cluster-name、cluster-nodes的功能。

在src/main/java/com/nowcoder/community/config包下新建ElasticSearchClientConfig类,即可连接elasticsearch服务器

@Configuration
public class ElasticSearchClientConfig {

    @Bean
    public RestHighLevelClient restHighLevelClien() {
        RestHighLevelClient client = new RestHighLevelClient(
                RestClient.builder(
                        new HttpHost("127.0.0.1", 9200, "http")));
        return client;
    }
}

在CommunityApplication主启动类中添加init方法解决Netty的冲突问题

@PostConstruct
public void init() {
    // 解决netty启动冲突的问题
    // Netty4Utils.setAvailableProcessors
    System.setProperty("es.set.netty.runtime.available.processors", "false");
}

在src/main/java/com/nowcoder/community/dao下新建elasticsearch包,并创建DiscussPostRepository接口

@Repository
public interface DiscussPostRepository extends ElasticsearchRepository<DiscussPost, Integer> {
}

最后创建ElasticsearchTests测试类,测试elasticsearch的基本功能

@SpringBootTest
@ContextConfiguration(classes = CommunityApplication.class)
public class ElasticsearchTests {

    @Autowired
    private DiscussPostMapper discussPostMapper;

    @Autowired
    private DiscussPostRepository discussPostRepository;

    @Autowired
    private ElasticsearchRestTemplate elasticsearchRestTemplate;

    @Test
    public void testInsert() {
        discussPostRepository.save(discussPostMapper.selectDiscussPostById(241));
        discussPostRepository.save(discussPostMapper.selectDiscussPostById(242));
        discussPostRepository.save(discussPostMapper.selectDiscussPostById(243));
    }

    @Test
    public void testInsertList() {
        discussPostRepository.saveAll(discussPostMapper.selectDiscussPosts(101, 0, 100));
        discussPostRepository.saveAll(discussPostMapper.selectDiscussPosts(102, 0, 100));
        discussPostRepository.saveAll(discussPostMapper.selectDiscussPosts(103, 0, 100));
        discussPostRepository.saveAll(discussPostMapper.selectDiscussPosts(111, 0, 100));
        discussPostRepository.saveAll(discussPostMapper.selectDiscussPosts(112, 0, 100));
        discussPostRepository.saveAll(discussPostMapper.selectDiscussPosts(131, 0, 100));
        discussPostRepository.saveAll(discussPostMapper.selectDiscussPosts(132, 0, 100));
        discussPostRepository.saveAll(discussPostMapper.selectDiscussPosts(133, 0, 100));
        discussPostRepository.saveAll(discussPostMapper.selectDiscussPosts(134, 0, 100));
    }

    @Test
    public void testUpdate() {
        DiscussPost post = discussPostMapper.selectDiscussPostById(231);
        post.setContent("我是新人,使劲灌水");
        discussPostRepository.save(post);
    }

    @Test
    public void testDelete() {
        // discussPostRepository.deleteById(231);
        discussPostRepository.deleteAll();
    }

    @Test
    public void testSearchByRepository() {
        NativeSearchQuery searchQuery = new NativeSearchQueryBuilder()
                .withQuery(QueryBuilders.multiMatchQuery("互联网寒冬", "title", "content"))
                .withSorts(SortBuilders.fieldSort("type").order(SortOrder.DESC))
                .withSorts(SortBuilders.fieldSort("score").order(SortOrder.DESC))
                .withSorts(SortBuilders.fieldSort("createTime").order(SortOrder.DESC))
                .withPageable(PageRequest.of(0, 10))
                .withHighlightFields(
                        new HighlightBuilder.Field("title").preTags("<em>").postTags("</em>"),
                        new HighlightBuilder.Field("content").preTags("<em>").postTags("</em>")
                ).build();

        // 底层获取得到了高亮显示的值,但是没有返回
        SearchHits<DiscussPost> hits = elasticsearchRestTemplate.search(searchQuery, DiscussPost.class);
        for (SearchHit<DiscussPost> hit : hits.getSearchHits()) {
            DiscussPost post = hit.getContent();
            for (String t : hit.getHighlightField("title")) {
                post.setTitle(t);
            }
            for (String c : hit.getHighlightField("content")) {
                post.setTitle(c);
            }
            System.out.println(post);
        }
    }
}

3. 开发社区搜索功能

搜索服务

  • 将帖子保存至Elasticsearch服务器。
    • 对贴子实体类DiscussPost用注解进行相关配置
    • 从Mybatis取数据存入
    • 在dao层创建DiscussPostRepository类,继承ElasticsearchRepository接口即可,它集成了CRUD方法
  • 从Elasticsearch服务器删除帖子。
  • 从Elasticsearch服务器搜索帖子。
    • Es可以在搜索到的词加标签,达到高亮显示
    • 利用elasticTemplate.queryForPage()查询

发布事件

  • 发布帖子时,将帖子异步的提交到Elasticsearch服务器。
    • 新建ElasticsearchService类,定义CRUD和搜索方法。
    • 在DiscussPostController类发帖时,定义和触发发帖事件(Event、eventProducer.fireEvent(event))
  • 增加评论时,将帖子异步的提交到Elasticsearch服务器。
    • 在CommentController类发表评论时,定义和触发发帖事件
  • 在消费组件中增加一个方法,消费帖子发布事件。
    • 在EventConsumer类增加消费发帖事件的方法
    • 在事件中查询帖子,存到Es服务器

显示结果

  • 在控制器中处理搜索请求,在HTML上显示搜索结果。
    • 新建SearchController类处理搜索请求
    • 此时为GET请求,keyword的传入(search?keyword=xxx)
    • 修改index.html,表单提交路径,文本框name=“keyword”
    • 在search.html修改,遍历取到帖子。

DEBUG

  • 记得要在kafka创建新的TOPIC,坑爹的debug了好久。

在src/main/java/com/nowcoder/community/service包下新建ElasticsearchService提供搜索业务

@Service
public class ElasticsearchService {

    @Autowired
    private DiscussPostRepository discussPostRepository;

    @Autowired
    private ElasticsearchRestTemplate elasticsearchRestTemplate;

    public void saveDiscussPost(DiscussPost post) {
        discussPostRepository.save(post);
    }

    public void deleteDiscussPost(int id) {
        discussPostRepository.deleteById(id);
    }

    public SearchHits<DiscussPost> searchDiscussPost(String keyword, int current, int limit) {
        NativeSearchQuery searchQuery = new NativeSearchQueryBuilder()
                .withQuery(QueryBuilders.multiMatchQuery(keyword, "title", "content"))
                .withSorts(SortBuilders.fieldSort("type").order(SortOrder.DESC))
                .withSorts(SortBuilders.fieldSort("score").order(SortOrder.DESC))
                .withSorts(SortBuilders.fieldSort("createTime").order(SortOrder.DESC))
                .withPageable(PageRequest.of(current, limit))
                .withHighlightFields(
                        new HighlightBuilder.Field("title").preTags("<em>").postTags("</em>"),
                        new HighlightBuilder.Field("content").preTags("<em>").postTags("</em>")
                ).build();

        return elasticsearchRestTemplate.search(searchQuery, DiscussPost.class);
    }
}

新建SearchController,建立搜索请求,返回es服务器的数据

@Controller
public class SearchController implements CommunityConstant {

    @Autowired
    private ElasticsearchService elasticsearchService;

    @Autowired
    private UserService userService;

    @Autowired
    private LikeService likeService;

    // search?keyword=xxx
    @RequestMapping(path = "/search", method = RequestMethod.GET)
    public String search(String keyword, Page page, Model model) {
        // 搜索帖子
        SearchHits<DiscussPost> searchHits = elasticsearchService.searchDiscussPost(keyword, page.getCurrent() - 1, page.getLimit());

        // 聚合数据
        List<Map<String, Object>> discussPosts = new ArrayList<>();
        if (searchHits != null) {
            for (SearchHit<DiscussPost> searchHit : searchHits) {
                DiscussPost post = searchHit.getContent();
                for (String t : searchHit.getHighlightField("title")) {
                    post.setTitle(t);
                }
                for (String c : searchHit.getHighlightField("content")) {
                    post.setContent(c);
                }
                Map<String, Object> map = new HashMap<>();
                // 帖子
                map.put("post", post);
                // 作者
                map.put("user", userService.findUserById(post.getUserId()));
                // 点赞数量
                map.put("likeCount", likeService.findEntityLikeCount(ENTITY_TYPE_POST, post.getId()));
                discussPosts.add(map);
            }
        }
        model.addAttribute("discussPosts", discussPosts);
        model.addAttribute("keyword", keyword);

        // 分页信息
        page.setPath("/search?keyword=" + keyword);
        page.setRows(searchHits == null ? 0 : (int) searchHits.getTotalHits());

        return "/site/search";
    }
}

修改search.html和index.html展示搜索结果

为了在发布新的帖子和增加评论后,es服务器仍然能够搜索到结果,需要在发帖和评论后同时同步es服务器中的数据,需要在CommentController和DiscussPostController中发帖和评论后触发相关事件,同步数据到es服务器。

DiscussPostController中的addDiscussPost方法修改为

@RequestMapping(path = "/add", method = RequestMethod.POST)
@ResponseBody
public String addDiscussPost(String title, String content) {
    User user = hostHolder.getUser();
    if (user == null) {
        return CommunityUtil.getJSONString(403, "你还没有登录哦");
    }
    DiscussPost post = new DiscussPost();
    post.setUserId(user.getId());
    post.setTitle(title);
    post.setContent(content);
    post.setCreateTime(new Date());
    discussPostService.addDiscussPost(post);

    // 触发发帖事件
    Event event = new Event()
            .setTopic(TOPIC_PUBLISH)
            .setUserId(user.getId())
            .setEntityType(ENTITY_TYPE_POST)
            .setEntityId(post.getId());
    eventProducer.fireEvent(event);

    // 报错的情况将来统一处理
    return CommunityUtil.getJSONString(200, "发布成功");
}

CommentController中的addComment修改为

@RequestMapping(path = "/add/{discussPostId}", method = RequestMethod.POST)
public String addComment(@PathVariable("discussPostId") int discussPostId, Comment comment) {
    comment.setUserId(hostHolder.getUser().getId());
    comment.setStatus(0);
    comment.setCreateTime(new Date());
    commentService.addComment(comment);

    // 触发评论事件
    Event event = new Event()
            .setTopic(TOPIC_COMMENT)
            .setUserId(hostHolder.getUser().getId())
            .setEntityType(comment.getEntityType())
            .setEntityId(comment.getEntityId())
            .setData("postId", discussPostId);

    if (comment.getEntityType() == ENTITY_TYPE_POST) {
        DiscussPost targe = discussPostService.findDiscussPostById(comment.getEntityId());
        event.setEntityUserId(targe.getUserId());
    } else if (comment.getEntityType() == ENTITY_TYPE_COMMENT) {
        Comment target = commentService.findCommentById(comment.getEntityId());
        event.setEntityUserId(target.getUserId());
    }
    eventProducer.fireEvent(event);

    if (comment.getEntityType() == ENTITY_TYPE_POST) {
        // 触发发帖事件
        event = new Event()
                .setTopic(TOPIC_PUBLISH)
                .setUserId(comment.getUserId())
                .setEntityType(ENTITY_TYPE_POST)
                .setEntityId(discussPostId);
        eventProducer.fireEvent(event);
    }

    return "redirect:/discuss/detail/" + discussPostId;
}

最后在EventConsumer中增加消费发帖事件的方法

// 消费发帖事件
@KafkaListener(topics = {TOPIC_PUBLISH})
public void handlePublishMessage(ConsumerRecord record) {
    if (record == null || record.value() == null) {
        logger.error("消息的内容为空");
        return;
    }

    Event event = JSONObject.parseObject(record.value().toString(), Event.class);
    if (event == null) {
        logger.error("消息格式错误");
        return;
    }

    DiscussPost post = discussPostService.findDiscussPostById(event.getEntityId());
    elasticsearchService.saveDiscussPost(post);
}

d.value() == null) {
logger.error(“消息的内容为空”);
return;
}

Event event = JSONObject.parseObject(record.value().toString(), Event.class);
if (event == null) {
    logger.error("消息格式错误");
    return;
}

DiscussPost post = discussPostService.findDiscussPostById(event.getEntityId());
elasticsearchService.saveDiscussPost(post);

}


举报

相关推荐

0 条评论