创建项目测试Kafka
创建项目knows-kafka
我们创建knows-kafka项目
用于测试kafka基本功能
编写发送和接收消息的测试
创建过程不需要勾选任何内容
父子相认
<module>knows-kafka</module>
子项目pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>cn.tedu</groupId>
<artifactId>knows</artifactId>
<version>0.0.1-SNAPSHOT</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>cn.tedu</groupId>
<artifactId>knows-kafka</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>knows-kafka</name>
<description>Demo project for Spring Boot</description>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!-- Google JSON API -->
<!-- 将java对象和json格式字符串相互转换 -->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
<!-- Kafka API -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>
</project>
application.properties配置如下
# 配置kafka位置
spring.kafka.bootstrap-servers=localhost:9092
# spring-kafka要求我们必须为当前项目的话题定义一个分组名称
# 以便于即使有不同的项目使用了相同的话题名称,也能区分开
# 这个配置是强制的,不配置会报错
spring.kafka.consumer.group-id=knows
# 日志门槛
logging.level.cn.tedu.knows.kafka=debug
SpringBoot启动类
@SpringBootApplication
// 启动项目添加kafka的支持
@EnableKafka
// 启动SpringBoot自带的定时任务功能,用于周期性的调用发布信息的方法
// 和kafka没有直接关系,只是本次测试需要使用到它
@EnableScheduling
public class KnowsKafkaApplication {
public static void main(String[] args) {
SpringApplication.run(KnowsKafkaApplication.class, args);
}
}
创建消息类
和数据库与ES一样
要向Kafka发消息,也要创建一个能够封装数据的类
我们在项目中创建一个vo包
包中创建Message类用于封装消息数据
@Data
@Accessors(chain = true)
public class Message implements Serializable {
private Integer id;
private String content;
private Long time;
}
编写消息发送
下面来发送消息
我们这里测试发送消息的方式是利用SpringBoot的定时任务
每隔10秒向kafka发送一条消息,这个消息就是Message对象的json格式字符串,我们创建一个demo包,保重编写一个Producer代码如下
@Component
@Slf4j
public class Producer {
// 当前类的目标是每隔10秒钟向kafka发送消息
// KafkaTemplate是Spring-kafka框架提供的操作kafka的对象
// 泛型<[话题名称的类型],[传递消息的类型]>
@Resource
private KafkaTemplate<String,String> kafkaTemplate;
int i=1;
// 利用SpringBoot的定时任务,每隔10秒运行一次下面的方法
// 这个方法会向kafka发送一条消息
@Scheduled(fixedRate = 10000)
public void sendMessage(){
// 实例化一个Message对象
Message message=new Message()
.setId(i++)
.setContent("向kafka发送消息了")
.setTime(System.currentTimeMillis());
// 实例化Gson对象将message转换为json格式字符串
Gson gson=new Gson();
String json=gson.toJson(message);
log.debug("即将发送信息:{}",json);
// 执行消息发送
kafkaTemplate.send("myTopic",json);
log.debug("消息已发送!");
}
}
编写消息的接收
继续在demo包中创建一个类Consumer
类中编写接收消息的方法
代码如下
@Component
@Slf4j
public class Consumer {
// Spring-kafka框架中提供了一个kafka监听器
// 当指定话题有消息出现时,这个监听器就会自动调用对应的方法
// 而且会将消息接收,并传递给该方法的参数
@KafkaListener(topics = "myTopic")
public void receive(ConsumerRecord<String,String> record){
// 当myTopic话题中有新消息时,监听器会自动调用这个方法
// 参数record就是发送到myTopic话题的消息
// ConsumerRecord<[话题名称的类型],[消息的类型]>
// 下面我们获得record中的消息内容
String json=record.value();
// 将json转换为java对象Message类型
Gson gson=new Gson();
Message message=gson.fromJson(json,Message.class);
// 日志输出获得的java对象信息
log.debug("接收到的Message:{}",message);
}
}
编写完之后
启动zookeeper和kafka
在运行kafka项目
观察控制台,每10秒发送一次信息
实现新增问题同步到ES
我们需要利用上面章节学习的kafka功能来完善达内知道项目新增问题并新增到ES的功能
- faq模块新增question时,将question对象发送给kafka
- search模块监听kafka中保存question对象的话题,当有消息时,将question读取并保存到ES
统一话题名称
faq模块和search模块是两个不同的项目
话题名称的对应是能够在kafka中相互通信的唯一保证
我们一般会在comments模块中,定义一个常量,供这两个项目使用
两个项目调用相同的一个常量值,他们的话题名称一定是一致的
转到knows-commons项目
创建一个vo包,包中创建一个Topic类
定义常量代码如下
public class Topic {
public static final String QUESTION="knows_add";
}
其实这个类中的常量名称和常量的值都不重要,关键是有了它,两个项目(生产者和消费者)能指向同一个常量
faq模块发送消息
转到knows-faq模块
先添加相关依赖
<!-- Google JSON API -->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
<!-- Kafka API -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
application.properties
# kafka相关配置
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=knows
SpringBoot启动类
@SpringBootApplication
@EnableDiscoveryClient
@MapperScan("cn.tedu.knows.faq.mapper")
// ↓↓↓↓↓↓↓↓↓↓
@EnableKafka
public class KnowsFaqApplication {
// 其它代码略
}
下面开始编写faq模块中将新增成功的question对象发送给kafka的代码
创建一个kafka包
包中创建QuestionProducer类,完成发送工作
代码如下
@Component
@Slf4j
public class QuestionProducer {
@Resource
private KafkaTemplate<String,String> kafkaTemplate;
// 定义新增成功的question为方法的参数
// 编写一个能够将这个question发送到kafka的方法
public void sendQuestion(Question question){
// 将question对象转换为json字符串
Gson gson=new Gson();
String json=gson.toJson(question);
log.debug("要发送的内容为:{}",json);
kafkaTemplate.send(Topic.QUESTION , json);
}
}
我们找到QuestionServiceImpl类,添加上面类的依赖注入,然后找到用户新增问题方法的最后
添加调用上面方法的代码
这样就能实现新增的问题发送给kafka了
@Resource
private QuestionProducer questionProducer;
//此处省略很多代码...
@Override
@Transactional
public void saveQuestion(String username, QuestionVo questionVo) {
//此处省略将问题保存到数据库的过程 ...
//将刚刚新增完成的问题,发送到kafka
//以便search模块接收并新增到Es
questionProducer.sendQuestion(question);
}
到此为止
我们达内知道项目在新增问题时
就会将问题新增到数据库后,再发送给kafka了
search模块接收消息
转到knows-search模块
和faq模块一样,也要添加相关依赖和配置
先添加相关依赖
<!-- Google JSON API -->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
<!-- Kafka API -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
application.properties
# kafka相关配置
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=knows
SpringBoot启动类
@SpringBootApplication
@EnableDiscoveryClient
// ↓↓↓↓↓↓↓↓↓↓
@EnableKafka
public class KnowsSearchApplication {
// 其它代码略
}
相关配置完毕
我们需要将kafka监听器调用指定方法看做是一个特殊的控制层方法
这个控制层方法需要调用业务逻辑层方法完成新增
然而我们当前search模块没有新增QuestionVO到ES的业务逻辑层方法
所以我们要先在search模块的业务逻辑层方法中新增它