文章目录
项目源码可以在 https://gitee.com/ShayneC/community获取
Elasticsearch,分布式搜索引擎
1. Elasticsearch入门
Elasticsearch简介
- 一个分布式的、Restful风格(请求标准的描述)的搜索引擎。
- 支持对各种类型的数据的检索。
- 搜索速度快,可以提供实时的搜索服务。
- 便于水平扩展,每秒可以处理PB级海量数据。
Elasticsearch术语
- 索引(对应数据库)、类型(对应表)、文档(表里一行)、字段(一列)。 最新的版本类型被废弃。
- 集群(服务器组合在一起)、节点(集群中每台服务器)、分片(对索引的划分)、副本(分片的备份)。
通过ES搜索的数据必须要在ES中转存一份,某种角度来说它是一个数据库。
Elasticsearch使用
-
安装、修改配置文件
- elasticsearch.yml文件,修改cluster.name,path.data,path.logs
- 配置环境变量
-
安装中文分词插件(ES仅支持中文分词)
- ik插件安装到plugins文件夹下
-
安装postman(提交html数据给ES)模拟web客户端
-
启动ES:打开bin/elasticsearch.bat
- 查看集群健康状态:curl -X GET “localhost:9200/_cat/health?v”
- 查看节点:curl -X GET “localhost:9200/_cat/nodes?v”
- 查看索引:curl -X GET “localhost:9200/_cat/indices?v”
- 创建索引:curl -X PUT “localhost:9200/test”
- 删除索引:curl -X DELETE “localhost:9200/test”
-
使用postman查询
-
提交数据,PUT localhost:9200/test/_doc/1选择Body,raw,JSON
-
搜索,GET localhost:9200/test/_search?q=title(/content):xxx
-
搜索时ES对关键词进行了分词
-
通过请求体构造复杂搜索条件
-
2. Spring整合Elasticsearch
- 引入依赖
- spring-boot-starter-data-elasticsearch
- 配置Elasticsearch
- cluster-name、cluster-nodes(集群的名字,节点)
- Redis和Es底层都用到了Netty,有启动冲突。解决:在CommunityApplication类加入初始化方法进行配置。
- Spring Data Elasticsearch(调用API)
- ElasticsearchTemplate(集成了Es的CRUD方法)
- ElasticsearchRepository(接口,底层为ElasticsearchTemplate,用起来更方便)
我是使用的elasticsearch是7.15.2版本与6.x相差较大,但使用起来更容易,对于springboot集成elasticsearch只需要添加一个配置类就可以了,并舍弃了6.x版本配置cluster-name、cluster-nodes的功能。
在src/main/java/com/nowcoder/community/config包下新建ElasticSearchClientConfig类,即可连接elasticsearch服务器
@Configuration
public class ElasticSearchClientConfig {
@Bean
public RestHighLevelClient restHighLevelClien() {
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(
new HttpHost("127.0.0.1", 9200, "http")));
return client;
}
}
在CommunityApplication主启动类中添加init方法解决Netty的冲突问题
@PostConstruct
public void init() {
// 解决netty启动冲突的问题
// Netty4Utils.setAvailableProcessors
System.setProperty("es.set.netty.runtime.available.processors", "false");
}
在src/main/java/com/nowcoder/community/dao下新建elasticsearch包,并创建DiscussPostRepository接口
@Repository
public interface DiscussPostRepository extends ElasticsearchRepository<DiscussPost, Integer> {
}
最后创建ElasticsearchTests测试类,测试elasticsearch的基本功能
@SpringBootTest
@ContextConfiguration(classes = CommunityApplication.class)
public class ElasticsearchTests {
@Autowired
private DiscussPostMapper discussPostMapper;
@Autowired
private DiscussPostRepository discussPostRepository;
@Autowired
private ElasticsearchRestTemplate elasticsearchRestTemplate;
@Test
public void testInsert() {
discussPostRepository.save(discussPostMapper.selectDiscussPostById(241));
discussPostRepository.save(discussPostMapper.selectDiscussPostById(242));
discussPostRepository.save(discussPostMapper.selectDiscussPostById(243));
}
@Test
public void testInsertList() {
discussPostRepository.saveAll(discussPostMapper.selectDiscussPosts(101, 0, 100));
discussPostRepository.saveAll(discussPostMapper.selectDiscussPosts(102, 0, 100));
discussPostRepository.saveAll(discussPostMapper.selectDiscussPosts(103, 0, 100));
discussPostRepository.saveAll(discussPostMapper.selectDiscussPosts(111, 0, 100));
discussPostRepository.saveAll(discussPostMapper.selectDiscussPosts(112, 0, 100));
discussPostRepository.saveAll(discussPostMapper.selectDiscussPosts(131, 0, 100));
discussPostRepository.saveAll(discussPostMapper.selectDiscussPosts(132, 0, 100));
discussPostRepository.saveAll(discussPostMapper.selectDiscussPosts(133, 0, 100));
discussPostRepository.saveAll(discussPostMapper.selectDiscussPosts(134, 0, 100));
}
@Test
public void testUpdate() {
DiscussPost post = discussPostMapper.selectDiscussPostById(231);
post.setContent("我是新人,使劲灌水");
discussPostRepository.save(post);
}
@Test
public void testDelete() {
// discussPostRepository.deleteById(231);
discussPostRepository.deleteAll();
}
@Test
public void testSearchByRepository() {
NativeSearchQuery searchQuery = new NativeSearchQueryBuilder()
.withQuery(QueryBuilders.multiMatchQuery("互联网寒冬", "title", "content"))
.withSorts(SortBuilders.fieldSort("type").order(SortOrder.DESC))
.withSorts(SortBuilders.fieldSort("score").order(SortOrder.DESC))
.withSorts(SortBuilders.fieldSort("createTime").order(SortOrder.DESC))
.withPageable(PageRequest.of(0, 10))
.withHighlightFields(
new HighlightBuilder.Field("title").preTags("<em>").postTags("</em>"),
new HighlightBuilder.Field("content").preTags("<em>").postTags("</em>")
).build();
// 底层获取得到了高亮显示的值,但是没有返回
SearchHits<DiscussPost> hits = elasticsearchRestTemplate.search(searchQuery, DiscussPost.class);
for (SearchHit<DiscussPost> hit : hits.getSearchHits()) {
DiscussPost post = hit.getContent();
for (String t : hit.getHighlightField("title")) {
post.setTitle(t);
}
for (String c : hit.getHighlightField("content")) {
post.setTitle(c);
}
System.out.println(post);
}
}
}
3. 开发社区搜索功能
搜索服务
- 将帖子保存至Elasticsearch服务器。
- 对贴子实体类DiscussPost用注解进行相关配置
- 从Mybatis取数据存入
- 在dao层创建DiscussPostRepository类,继承ElasticsearchRepository接口即可,它集成了CRUD方法
- 从Elasticsearch服务器删除帖子。
- 从Elasticsearch服务器搜索帖子。
- Es可以在搜索到的词加标签,达到高亮显示
- 利用elasticTemplate.queryForPage()查询
发布事件
- 发布帖子时,将帖子异步的提交到Elasticsearch服务器。
- 新建ElasticsearchService类,定义CRUD和搜索方法。
- 在DiscussPostController类发帖时,定义和触发发帖事件(Event、eventProducer.fireEvent(event))
- 增加评论时,将帖子异步的提交到Elasticsearch服务器。
- 在CommentController类发表评论时,定义和触发发帖事件
- 在消费组件中增加一个方法,消费帖子发布事件。
- 在EventConsumer类增加消费发帖事件的方法
- 在事件中查询帖子,存到Es服务器
显示结果
- 在控制器中处理搜索请求,在HTML上显示搜索结果。
- 新建SearchController类处理搜索请求
- 此时为GET请求,keyword的传入(search?keyword=xxx)
- 修改index.html,表单提交路径,文本框name=“keyword”
- 在search.html修改,遍历取到帖子。
DEBUG
- 记得要在kafka创建新的TOPIC,坑爹的debug了好久。
在src/main/java/com/nowcoder/community/service包下新建ElasticsearchService提供搜索业务
@Service
public class ElasticsearchService {
@Autowired
private DiscussPostRepository discussPostRepository;
@Autowired
private ElasticsearchRestTemplate elasticsearchRestTemplate;
public void saveDiscussPost(DiscussPost post) {
discussPostRepository.save(post);
}
public void deleteDiscussPost(int id) {
discussPostRepository.deleteById(id);
}
public SearchHits<DiscussPost> searchDiscussPost(String keyword, int current, int limit) {
NativeSearchQuery searchQuery = new NativeSearchQueryBuilder()
.withQuery(QueryBuilders.multiMatchQuery(keyword, "title", "content"))
.withSorts(SortBuilders.fieldSort("type").order(SortOrder.DESC))
.withSorts(SortBuilders.fieldSort("score").order(SortOrder.DESC))
.withSorts(SortBuilders.fieldSort("createTime").order(SortOrder.DESC))
.withPageable(PageRequest.of(current, limit))
.withHighlightFields(
new HighlightBuilder.Field("title").preTags("<em>").postTags("</em>"),
new HighlightBuilder.Field("content").preTags("<em>").postTags("</em>")
).build();
return elasticsearchRestTemplate.search(searchQuery, DiscussPost.class);
}
}
新建SearchController,建立搜索请求,返回es服务器的数据
@Controller
public class SearchController implements CommunityConstant {
@Autowired
private ElasticsearchService elasticsearchService;
@Autowired
private UserService userService;
@Autowired
private LikeService likeService;
// search?keyword=xxx
@RequestMapping(path = "/search", method = RequestMethod.GET)
public String search(String keyword, Page page, Model model) {
// 搜索帖子
SearchHits<DiscussPost> searchHits = elasticsearchService.searchDiscussPost(keyword, page.getCurrent() - 1, page.getLimit());
// 聚合数据
List<Map<String, Object>> discussPosts = new ArrayList<>();
if (searchHits != null) {
for (SearchHit<DiscussPost> searchHit : searchHits) {
DiscussPost post = searchHit.getContent();
for (String t : searchHit.getHighlightField("title")) {
post.setTitle(t);
}
for (String c : searchHit.getHighlightField("content")) {
post.setContent(c);
}
Map<String, Object> map = new HashMap<>();
// 帖子
map.put("post", post);
// 作者
map.put("user", userService.findUserById(post.getUserId()));
// 点赞数量
map.put("likeCount", likeService.findEntityLikeCount(ENTITY_TYPE_POST, post.getId()));
discussPosts.add(map);
}
}
model.addAttribute("discussPosts", discussPosts);
model.addAttribute("keyword", keyword);
// 分页信息
page.setPath("/search?keyword=" + keyword);
page.setRows(searchHits == null ? 0 : (int) searchHits.getTotalHits());
return "/site/search";
}
}
修改search.html和index.html展示搜索结果
为了在发布新的帖子和增加评论后,es服务器仍然能够搜索到结果,需要在发帖和评论后同时同步es服务器中的数据,需要在CommentController和DiscussPostController中发帖和评论后触发相关事件,同步数据到es服务器。
DiscussPostController中的addDiscussPost方法修改为
@RequestMapping(path = "/add", method = RequestMethod.POST)
@ResponseBody
public String addDiscussPost(String title, String content) {
User user = hostHolder.getUser();
if (user == null) {
return CommunityUtil.getJSONString(403, "你还没有登录哦");
}
DiscussPost post = new DiscussPost();
post.setUserId(user.getId());
post.setTitle(title);
post.setContent(content);
post.setCreateTime(new Date());
discussPostService.addDiscussPost(post);
// 触发发帖事件
Event event = new Event()
.setTopic(TOPIC_PUBLISH)
.setUserId(user.getId())
.setEntityType(ENTITY_TYPE_POST)
.setEntityId(post.getId());
eventProducer.fireEvent(event);
// 报错的情况将来统一处理
return CommunityUtil.getJSONString(200, "发布成功");
}
CommentController中的addComment修改为
@RequestMapping(path = "/add/{discussPostId}", method = RequestMethod.POST)
public String addComment(@PathVariable("discussPostId") int discussPostId, Comment comment) {
comment.setUserId(hostHolder.getUser().getId());
comment.setStatus(0);
comment.setCreateTime(new Date());
commentService.addComment(comment);
// 触发评论事件
Event event = new Event()
.setTopic(TOPIC_COMMENT)
.setUserId(hostHolder.getUser().getId())
.setEntityType(comment.getEntityType())
.setEntityId(comment.getEntityId())
.setData("postId", discussPostId);
if (comment.getEntityType() == ENTITY_TYPE_POST) {
DiscussPost targe = discussPostService.findDiscussPostById(comment.getEntityId());
event.setEntityUserId(targe.getUserId());
} else if (comment.getEntityType() == ENTITY_TYPE_COMMENT) {
Comment target = commentService.findCommentById(comment.getEntityId());
event.setEntityUserId(target.getUserId());
}
eventProducer.fireEvent(event);
if (comment.getEntityType() == ENTITY_TYPE_POST) {
// 触发发帖事件
event = new Event()
.setTopic(TOPIC_PUBLISH)
.setUserId(comment.getUserId())
.setEntityType(ENTITY_TYPE_POST)
.setEntityId(discussPostId);
eventProducer.fireEvent(event);
}
return "redirect:/discuss/detail/" + discussPostId;
}
最后在EventConsumer中增加消费发帖事件的方法
// 消费发帖事件
@KafkaListener(topics = {TOPIC_PUBLISH})
public void handlePublishMessage(ConsumerRecord record) {
if (record == null || record.value() == null) {
logger.error("消息的内容为空");
return;
}
Event event = JSONObject.parseObject(record.value().toString(), Event.class);
if (event == null) {
logger.error("消息格式错误");
return;
}
DiscussPost post = discussPostService.findDiscussPostById(event.getEntityId());
elasticsearchService.saveDiscussPost(post);
}
d.value() == null) {
logger.error(“消息的内容为空”);
return;
}
Event event = JSONObject.parseObject(record.value().toString(), Event.class);
if (event == null) {
logger.error("消息格式错误");
return;
}
DiscussPost post = discussPostService.findDiscussPostById(event.getEntityId());
elasticsearchService.saveDiscussPost(post);
}