0
点赞
收藏
分享

微信扫一扫

Springboot最简单的实战介绍 整合kafka-生产者与消费者(消息推送与订阅获取

萧让听雪 2022-04-13 阅读 51

前言

本科毕业后就一直从事Java开发的工作,和多数人一样,最开始从事crud的工作,看着自己的同学一步一步往上走,自己还是在原地踏步,说实话这不是自己想要的状态。

一年半后开始沪漂生活,又摸爬滚打了一年半,薪资基本上在16K。疫情在家开始了系统的学习,目的就是为了进大厂,得到更多的学习机会,也是为了自己到所谓的“35岁”之后,也能在老家恰一口好饭。

努力也没有白费,8个月时间的系统性学习成功上岸Alibaba,直接涨薪14K,下面分享一下我的学习指南,面试真题以及个人心得体会。(晒一下入职offer)

KAFKA

kafka:

ָkafka服务器地址,可以指定多个

bootstrap-servers: 123.xxx.x.xxx:19092,123.xxx.x.xxx:19093,123.xxx.x.xxx:19094

#=============== producer生产者配置 =======================

producer:

retries: 0

每次批量发送消息的数量

batch-size: 16384

缓存容量

buffer-memory: 33554432

ָ指定消息key和消息体的编解码方式

key-serializer: org.apache.kafka.common.serialization.StringSerializer

value-serializer: org.apache.kafka.common.serialization.StringSerializer

#=============== consumer消费者配置 =======================

consumer:

#指定默认消费者的group id

group-id: test-app

#earliest

#当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费

#latest

#当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据

#none

#topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常

auto-offset-reset: latest

enable-auto-commit: true

auto-commit-interval: 100ms

#指定消费key和消息体的编解码方式

key-deserializer: org.apache.kafka.common.serialization.StringDeserializer

value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

好了,配置工作准备完毕。

我们先来搞Kafka的生产者,也就是负责推送消息的模块:

创建一个类, 叫KafkaSender(注解不能少,留意代码),

package com.kafkademo.producer;

import com.google.gson.Gson;

import com.google.gson.GsonBuilder;

import com.sun.scenario.effect.impl.sw.sse.SSEBlend_SRC_OUTPeer;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.kafka.core.KafkaTemplate;

import org.springframework.kafka.support.SendResult;

import org.springframework.stereotype.Component;

import org.springframework.util.concurrent.ListenableFuture;

import org.springframework.util.concurrent.ListenableFutureCallback;

/**

  • Hello!

  • Created By JCccc on 2018/11/24

  • 11:25

*/

@Component

public class KafkaSender {

@Autowired

private KafkaTemplate<String, Object> kafkaTemplate;

private final Logger logger = LoggerFactory.getLogger(KafkaSender.class);

public void send(String topic, String taskid, String jsonStr) {

//发送消息

ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, taskid, jsonStr);

future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {

@Override

//推送成功

public void onSuccess(SendResult<String, Object> result) {

logger.info(topic + " 生产者 发送消息成功:" + result.toString());

}

@Override

//推送失败

public void onFailure(Throwable ex) {

logger.info(topic + " 生产者 发送消息失败:" + ex.getMessage());

}

});

}

}

以上就是kafka生产者了,到此刻,你已经可以开始往kafka服务器推送消息了

事不宜迟,我们立马试试:

创建个controller,搞个接口试试推送下消息,

@GetMapping(“/sendMessageToKafka”)

public String sendMessageToKafka() {

Map<String,String> messageMap=new HashMap();

messageMap.put(“message”,“我是一条消息”);

String taskid=“123456”;

String jsonStr=JSONObject.toJSONString(messageMap);

//kakfa的推送消息方法有多种,可以采取带有任务key的,也可以采取不带有的(不带时默认为null)

kafkaSender.send(“testTopic”,taskid,jsonStr);

return “hi guy!”;

}

用postman测一下(对了,这些推送的前提是你的kafka服务器是没问题的,能正常连接)

看看控制台反应:

可以看到,我们的kafka生产者再推送消息成功后,成功进入了我们的回调函数onSuccess,也打印了日志。

《一线大厂Java面试真题解析+Java核心总结学习笔记+最新全套讲解视频+实战项目源码》开源

最后

这份《“java高分面试指南”-25分类227页1000+题50w+字解析》同样可分享给有需要的朋友,感兴趣的伙伴们可挑战一下自我,在不看答案解析的情况,测试测试自己的解题水平,这样也能达到事半功倍的效果!(好东西要大家一起看才香)

image

image

最后

这份《“java高分面试指南”-25分类227页1000+题50w+字解析》同样可分享给有需要的朋友,感兴趣的伙伴们可挑战一下自我,在不看答案解析的情况,测试测试自己的解题水平,这样也能达到事半功倍的效果!(好东西要大家一起看才香)

[外链图片转存中…(img-sVBHqISy-1649569989314)]

[外链图片转存中…(img-xUPVf1pb-1649569989314)]

举报

相关推荐

0 条评论