一般情况下,我们在写springboot使用Kafka监听的代码时,都是直接写个类,然后在方法上加个@KafkaListener就可以了,简单省事。
就像下面这样
@Component
@Slf4j
public class KafkaConsumer {
@Autowired
private KafkaCustomProperties kafkaCustomProperties;
@KafkaListener(topics = {"#{@kafkaCustomProperties.topic}"}, groupId = "#{@kafkaCustomProperties.groupId}")
public void listen(String message) {
log.info("接收到的消息:{}", message);
// do something...
}
}
这样做其实是没问题的,但有的时候我们的kafka服务会莫名其妙停掉,然后就一直报监听不到Kafka服务的错误信息,又不想改代码,就可以使用spring的Condition机制,在启动springboot服务时,先判断一下能不能连上Kafka服务,如果连不上,就不注入KafkaConsumer类。
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.context.annotation.Condition;
import org.springframework.context.annotation.ConditionContext;
import org.springframework.core.env.Environment;
import org.springframework.core.type.AnnotatedTypeMetadata;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.URI;
/**
* @Author: 夏威夷8080
* @Date: 2011/6/7 9:38
*/
@Slf4j
public class KafkaConnectCondition implements Condition {
@Override
public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {
Environment environment = context.getEnvironment();
String kafkaServers = environment.getProperty("spring.kafka.consumer.bootstrap-servers");
log.info("获取到的kafkaServers:{}", kafkaServers);
if (StringUtils.isBlank(kafkaServers)){
return false;
}
String serverPort = kafkaServers.split(",")[0];
URI uri = URI.create("http://" + serverPort);
return this.isConnectable(uri.getHost(), uri.getPort());
}
/**
* 判断kafka服务能否正常连接
* @param host
* @param port
* @return
*/
private boolean isConnectable(String host, int port) {
boolean result = true;
Socket socket = new Socket();
try {
socket.connect(new InetSocketAddress(host, port),3000);
} catch (IOException e) {
log.error("========注意!!!!!未能连接上kafka服务,意味着kafka监听将不开启,{}:{},{}", host, port, e.getMessage());
result = false;
} finally {
try {
socket.close();
} catch (IOException e) {
log.error("关闭kafka服务socket出错,{}:{},{}", host, port, e.getMessage());
result = false;
}
}
log.info("========kafka服务能正常连接========");
return result;
}
}
在KafkaConsumer类上加上@Conditional(KafkaConnectCondition.class)就可以了。
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Conditional;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
/**
* @Author: 夏威夷8080
* @Date: 2011/5/11 19:33
*/
@Component
@Slf4j
@Conditional(KafkaConnectCondition.class)
public class KafkaConsumer {
@Autowired
private SyncHandlerFactory syncHandlerFactory;
@Autowired
private KafkaCustomProperties kafkaCustomProperties;
// 先屏蔽监听,后面再放开
@KafkaListener(topics = {"#{@kafkaCustomProperties.topic}"}, groupId = "#{@kafkaCustomProperties.groupId}")
public void listen(String message) {
log.info("接收到的消息:{}", message);
// do something...
}
}
这样修改之后,每次在启动springboot服务时都会检查下Kafka能不能正常连接上,相当于做一个容错的处理吧。