pom文件引入依赖
<!--mqtt-->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.4</version>
</dependency>
application.yml 加入配置
spring:
application:
name: test-porject
mqtt:
url: tcp://${MQTT_HOST:172.16.10.201:1883}
client-id: ${spring.application.name}
topic:
- ${MQTT_TOPIC:/iot/#}
配置类
import com.workface.fullymechanizemine.common.listener.MqttSubscribeListener;
import com.workface.fullymechanizemine.core.tool.utils.StringUtil;
import org.eclipse.paho.client.mqttv3.IMqttClient;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableConfigurationProperties({MqttProperties.class})
public class MqttConfiguration {
private static final Logger log = LoggerFactory.getLogger(MqttConfiguration.class);
@Autowired
private MqttProperties mqttProperties;
public MqttConfiguration() {
}
@Bean
public MqttConnectOptions mqttConnectOptions() {
MqttConnectOptions connectOptions = new MqttConnectOptions();
connectOptions.setServerURIs(new String[]{this.mqttProperties.getUrl()});
if (StringUtil.isNotBlank(this.mqttProperties.getUrl())) {
connectOptions.setUserName(this.mqttProperties.getUsername());
}
if (StringUtil.isNotBlank(this.mqttProperties.getPassword())) {
connectOptions.setPassword(this.mqttProperties.getPassword().toCharArray());
}
connectOptions.setKeepAliveInterval(60);
return connectOptions;
}
@Bean
public IMqttClient mqttClient(MqttConnectOptions options) throws MqttException {
IMqttClient mqttClient = new MqttClient(this.mqttProperties.getUrl(), this.mqttProperties.getClientId());
mqttClient.connect(options);
for(int x = 0; x < this.mqttProperties.getTopic().length; ++x) {
mqttClient.subscribe(this.mqttProperties.getTopic()[x], new MqttSubscribeListener());
}
return mqttClient;
}
}
MqttProperties属性类
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
@Data
@ConfigurationProperties("spring.mqtt")
public class MqttProperties {
private String url;
private String clientId;
private String username;
private String password;
private String[] topic;
}
MqttSubscribeListener 订阅监听类
import com.workface.fullymechanizemine.common.event.MqttSubscribeEvent;
import com.workface.fullymechanizemine.core.tool.utils.SpringUtil;
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.MqttMessage;
public class MqttSubscribeListener implements IMqttMessageListener {
@Override
public void messageArrived(String s, MqttMessage mqttMessage) {
String content = new String(mqttMessage.getPayload());
MqttSubscribeEvent event = new MqttSubscribeEvent(s, content);
SpringUtil.publishEvent(event);
}
}
MqttEventListener 事件监听类
import com.baomidou.mybatisplus.core.toolkit.StringPool;
import com.workface.fullymechanizemine.common.config.mqtt.MqttProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;
import java.util.Arrays;
import java.util.List;
@Configuration
public class MqttEventListener {
private static final Logger log = LoggerFactory.getLogger(MqttEventListener.class);
@Resource
private MqttProperties mqttProperties;
private String processTopic (String topic) {
List<String> topics = Arrays.asList(mqttProperties.getTopic());
for (String wild : topics) {
wild = wild.replace(StringPool.HASH, StringPool.EMPTY);
if (topic.startsWith(wild)) {
return topic.replace(wild, StringPool.EMPTY);
}
}
return StringPool.EMPTY;
}
// private static List<AcceptPointDTO> toPoints (Object source) {
// String str = Objects.toString(source);
// List<AcceptPointDTO> data = JSONArray.parseArray(str, AcceptPointDTO.class);
// return data;
// }
// @Async
// @EventListener(MqttSubscribeEvent.class)
// public void listen (MqttSubscribeEvent event) {
// String topic = processTopic(event.getTopic());
// Object source = event.getSource();
// List<AcceptPointDTO> data = toPoints(source);
// if (Func.isEmpty(data)) {
// return;
// }
// ConcurrentHashMap<String, WebSocketService> webSocketMap = WebSocketService.getWebSocketMap();
// if (!Func.isEmpty(webSocketMap) && webSocketMap.size()>=1){
// for (Map.Entry<String, WebSocketService> entry : webSocketMap.entrySet()) {
// WebSocketService webSocketService = entry.getValue();
// try {
// webSocketService.sendMessage(JSONObject.toJSONString(data));
// } catch (IOException e) {
// e.printStackTrace();
// }
// }
// }
// I_POINT_VALUE_SERVICE.savePointValue(data);
// }
}
MqttUtil 工具类
import com.workface.fullymechanizemine.core.tool.utils.SpringUtil;
import org.eclipse.paho.client.mqttv3.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MqttUtil {
private static final Logger log = LoggerFactory.getLogger(MqttUtils.class);
public MqttUtils() {
}
public static IMqttClient getClient() {
IMqttClient client = (IMqttClient) SpringUtil.getBean(IMqttClient.class);
MqttConnectOptions options = (MqttConnectOptions)SpringUtil.getBean(MqttConnectOptions.class);
if (!client.isConnected()) {
log.info("client:" + client.getClientId() + "未连接,初始化连接");
try {
client.connect(options);
} catch (MqttException var3) {
throw new RuntimeException("mqtt客户端连接失败", var3);
}
}
return client;
}
public static boolean publish(String topic, String message) {
try {
getClient().publish(topic, new MqttMessage(message.getBytes()));
return true;
} catch (MqttException var3) {
log.error("mqtt-message 发送失败", var3);
return false;
}
}
public static boolean subscribe(String topic, IMqttMessageListener listener) {
try {
getClient().subscribe(topic, listener);
return true;
} catch (MqttException var3) {
log.error("客户端订阅{0}失败", topic);
return false;
}
}
}
SpringUtil 工具类
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.lang.Nullable;
public class SpringUtil implements ApplicationContextAware {
private static final Logger log = LoggerFactory.getLogger(SpringUtil.class);
private static ConfigurableApplicationContext context;
public static DefaultListableBeanFactory getBeanFactory() {
return (DefaultListableBeanFactory)context.getBeanFactory();
}
public static <T> T getBean(Class<T> clazz) {
return clazz == null ? null : context.getBean(clazz);
}
public static <T> T getBean(String beanId) {
return beanId == null ? null : (T) context.getBean(beanId);
}
public static <T> T getBean(String beanName, Class<T> clazz) {
if (null != beanName && !"".equals(beanName.trim())) {
return clazz == null ? null : context.getBean(beanName, clazz);
} else {
return null;
}
}
public static ApplicationContext getContext() {
return context == null ? null : context;
}
public static void publishEvent(ApplicationEvent event) {
if (context != null) {
try {
context.publishEvent(event);
} catch (Exception var2) {
log.error(var2.getMessage());
}
}
}
public static void registerBeanDefinition(String beanName, BeanDefinition definition) {
getBeanFactory().registerBeanDefinition(beanName, definition);
}
public void setApplicationContext(@Nullable ApplicationContext context) throws BeansException {
SpringUtil.context = (ConfigurableApplicationContext)context;
}
public static String getProperty(String prop) {
return context.getEnvironment().getProperty(prop);
}
}