0
点赞
收藏
分享

微信扫一扫

springboot利用Condition机制解决Kafka监听不到服务时报错的问题

一般情况下,我们在写springboot使用Kafka监听的代码时,都是直接写个类,然后在方法上加个@KafkaListener就可以了,简单省事。

就像下面这样

@Component
@Slf4j
public class KafkaConsumer {

@Autowired
private KafkaCustomProperties kafkaCustomProperties;

@KafkaListener(topics = {"#{@kafkaCustomProperties.topic}"}, groupId = "#{@kafkaCustomProperties.groupId}")
public void listen(String message) {
log.info("接收到的消息:{}", message);
// do something...

}

}

这样做其实是没问题的,但有的时候我们的kafka服务会莫名其妙停掉,然后就一直报监听不到Kafka服务的错误信息,又不想改代码,就可以使用spring的Condition机制,在启动springboot服务时,先判断一下能不能连上Kafka服务,如果连不上,就不注入KafkaConsumer类。

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.context.annotation.Condition;
import org.springframework.context.annotation.ConditionContext;
import org.springframework.core.env.Environment;
import org.springframework.core.type.AnnotatedTypeMetadata;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.URI;

/**
* @Author: 夏威夷8080
* @Date: 2011/6/7 9:38
*/
@Slf4j
public class KafkaConnectCondition implements Condition {
@Override
public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {
Environment environment = context.getEnvironment();
String kafkaServers = environment.getProperty("spring.kafka.consumer.bootstrap-servers");
log.info("获取到的kafkaServers:{}", kafkaServers);
if (StringUtils.isBlank(kafkaServers)){
return false;
}

String serverPort = kafkaServers.split(",")[0];
URI uri = URI.create("http://" + serverPort);

return this.isConnectable(uri.getHost(), uri.getPort());
}

/**
* 判断kafka服务能否正常连接
* @param host
* @param port
* @return
*/
private boolean isConnectable(String host, int port) {
boolean result = true;
Socket socket = new Socket();
try {
socket.connect(new InetSocketAddress(host, port),3000);
} catch (IOException e) {
log.error("========注意!!!!!未能连接上kafka服务,意味着kafka监听将不开启,{}:{},{}", host, port, e.getMessage());
result = false;
} finally {
try {
socket.close();
} catch (IOException e) {
log.error("关闭kafka服务socket出错,{}:{},{}", host, port, e.getMessage());
result = false;
}
}
log.info("========kafka服务能正常连接========");
return result;
}
}

在KafkaConsumer类上加上@Conditional(KafkaConnectCondition.class)就可以了。

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Conditional;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

/**
* @Author: 夏威夷8080
* @Date: 2011/5/11 19:33
*/
@Component
@Slf4j
@Conditional(KafkaConnectCondition.class)
public class KafkaConsumer {

@Autowired
private SyncHandlerFactory syncHandlerFactory;

@Autowired
private KafkaCustomProperties kafkaCustomProperties;

// 先屏蔽监听,后面再放开
@KafkaListener(topics = {"#{@kafkaCustomProperties.topic}"}, groupId = "#{@kafkaCustomProperties.groupId}")
public void listen(String message) {
log.info("接收到的消息:{}", message);
// do something...

}
}

这样修改之后,每次在启动springboot服务时都会检查下Kafka能不能正常连接上,相当于做一个容错的处理吧。



举报

相关推荐

0 条评论