0
点赞
收藏
分享

微信扫一扫

spring整合kafka项目生产和消费测试结果记录(一)

使用spring+springMVC+mybatis+kafka做了两个web项目,一个是生产者,一个是消费者。

通过JMeter测试工具模拟100个用户并发访问生产者项目,发送json数据给生产者的接口,生产者将json数据发送到kafka集群,

消费者监听到kafka集群中的消息就开始消费,并将json解析成对象存到MySQL数据库。

下面是使用JMeter测试工具模拟100个并发的线程设置截图:

spring整合kafka项目生产和消费测试结果记录(一)_Mybatis

请求所发送的数据:

spring整合kafka项目生产和消费测试结果记录(一)_consumer_02

 

下面是100个用户10000个请求的聚合报告:

spring整合kafka项目生产和消费测试结果记录(一)_Mybatis_03

 

下面是生产者截图生产完10000条消息的时间截图:

spring整合kafka项目生产和消费测试结果记录(一)_kafka_04

 

下面是消费者项目消费入库的结束时间截图:

spring整合kafka项目生产和消费测试结果记录(一)_kafka_05

 

可见,10000条消息从生产完成到入库(消费完10000条消息的时间只是比生产完成的时间落后了几十秒,但是消费端真正入库完成所需要的时间很长)完成时间相差了10几分钟。

 

下面是MySQL数据库截图,数据全部入库成功:

spring整合kafka项目生产和消费测试结果记录(一)_Spring_06

下面是消息对应的POJO:

1 package com.xuebusi.pojo;
2
3 public class TbPerson {
4 private Long id;
5
6 private String name;
7
8 private Integer age;
9
10 public Long getId() {
11 return id;
12 }
13
14 public void setId(Long id) {
15 this.id = id;
16 }
17
18 public String getName() {
19 return name;
20 }
21
22 public void setName(String name) {
23 this.name = name == null ? null : name.trim();
24 }
25
26 public Integer getAge() {
27 return age;
28 }
29
30 public void setAge(Integer age) {
31 this.age = age;
32 }
33
34 @Override
35 public String toString() {
36 return "TbPerson [id=" + id + ", name=" + name + ", age=" + age + "]";
37 }
38

 

下面是生产端的逻辑:

1 package com.xuebusi.controller;
2
3 import com.alibaba.fastjson.JSON;
4 import com.xuebusi.pojo.TbPerson;
5 import com.xuebusi.service.KafkaService;
6 import org.slf4j.Logger;
7 import org.slf4j.LoggerFactory;
8 import org.springframework.stereotype.Controller;
9 import org.springframework.web.bind.annotation.RequestBody;
10 import org.springframework.web.bind.annotation.RequestMapping;
11 import org.springframework.web.bind.annotation.RequestMethod;
12 import org.springframework.web.bind.annotation.ResponseBody;
13
14 import javax.annotation.Resource;
15
16 @Controller
17 @RequestMapping("/producer")
18 public class KafkaController {
19
20 private static final Logger logger = LoggerFactory.getLogger(KafkaController.class);
21
22 @Resource
23 private KafkaService kafkaService;
24
25 /**
26 * 发消息到ssmk这个topic
27 * @param person
28 * @return
29 */
30 @RequestMapping(value = "/person", method = RequestMethod.POST)
31 @ResponseBody
32 public String createPerson(@RequestBody TbPerson person) {
33 if (person == null){
34 return "fail, data can not be null.";
35 }
36 String json = JSON.toJSONString(person);
37 boolean result = kafkaService.sendInfo("ssmk", json);
38 logger.info("生产者发送消息[" + result + "]:" + json);
39 return "success";
40 }
41

 

下面是消费端的逻辑:

1 package com.xuebusi.consumer;
2
3 import com.alibaba.fastjson.JSON;
4 import com.xuebusi.pojo.TbPerson;
5 import com.xuebusi.service.PersonService;
6 import org.slf4j.Logger;
7 import org.slf4j.LoggerFactory;
8 import org.springframework.beans.factory.annotation.Autowired;
9 import org.springframework.stereotype.Service;
10
11 import java.util.List;
12 import java.util.Map;
13
14 @Service
15 public class KafkaConsumerService {
16 private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerService.class);
17
18 @Autowired
19 private PersonService personService;
20
21 public void processMessage(Map<String, Map<Integer, String>> msgs) {
22 /*for (Map.Entry<String, Map<Integer, String>> entry : msgs.entrySet()) {
23 String topic = entry.getKey();
24 Map<Integer, String> value = entry.getValue();
25 for (Map.Entry<Integer, String> entrySet : value.entrySet()) {
26 Integer partiton = entrySet.getKey();
27 String msg = entrySet.getValue();
28 logger.info("消费的主题:" + topic + ",消费的分区:" + partiton + ",消费的消息:" + msg);
29 logger.info("=======使用JSON解析对象=========");
30 TbPerson person = JSON.parseObject(msg, TbPerson.class);
31 logger.info("=======对象开始入库=========");
32 personService.insert(person);
33 logger.info("=======对象入库成功=========");
34 }
35 }*/
36
37 for (Map.Entry<String, Map<Integer, String>> entry : msgs.entrySet()) {
38 String topic = entry.getKey();
39 Map<Integer, String> value = entry.getValue();
40 for (Map.Entry<Integer, String> entrySet : value.entrySet()) {
41 Integer partiton = entrySet.getKey();
42 String msg = entrySet.getValue();
43 logger.info("消费的主题:" + topic + ",消费的分区:" + partiton + ",消费的消息:" + msg);
44 msg = "[" + msg + "]";//注意这里要在前后都加上中括号,否则下面在解析json成对象的时候会报json格式不对的异常(spring会对多条json数据用逗号分隔)
45 logger.info("=======使用JSON解析对象=========");
46 List<TbPerson> personList = JSON.parseArray(msg, TbPerson.class);
47 //TbPerson person = JSON.parseObject(msg, TbPerson.class);
48 if (personList != null && personList.size() > 0) {
49 logger.info("消息中包含[" + personList.size() + "]个对象");
50 for (TbPerson person : personList) {
51 logger.info("=======对象开始入库=========");
52 personService.insert(person);
53 logger.info("=======对象入库成功=========");
54 }
55 }
56
57 }
58 }
59 }
60




举报

相关推荐

0 条评论