Eclipse Paho Java Client 是用 Java 编写的 MQTT 客户端库(MQTT Java Client),可用于 JVM 或其他 Java 兼容平台(例如Android);本文主要介绍使用如何使用它来操作 EMQX,文中所使用到的软件版本:EMQX 4.2.2、Paho 1.2.5、Java 1.8.0_321。
1、引入依赖
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
2、同步方式收发消息
2.1、发送消息
public static void publish() {
try {
MqttClient mqttClient = new MqttClient(SERVER_URI, "client-publish", new MemoryPersistence());
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
/*会话清除标识
* false:服务端必须使用与 Client ID 关联的会话来恢复与客户端的通信。如果不存在这样的会话,服务器必须创建一个新会话。客户端和服务器在断开连接后必须存储会话的状态。
* true:客户端和服务器必须丢弃任何先前的会话并创建一个新的会话。该会话的生命周期将和网络连接保持一致,其会话状态一定不能被之后的任何会话重用。
*/
mqttConnectOptions.setCleanSession(true);
mqttConnectOptions.setUserName("admin");
mqttConnectOptions.setPassword("123456".toCharArray());
mqttConnectOptions.setKeepAliveInterval(10);
mqttClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
logger.info("连接断开:{}", cause.getMessage());
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
logger.info("发送完成:{}", token.isComplete());
}
});
mqttClient.connect(mqttConnectOptions);
for (int i = 0; i < 10; i++) {
MqttMessage mqttMessage = new MqttMessage(("测试消息" + (i + 1)).getBytes());
mqttMessage.setQos(2);
//是否保留消息,只能保留最新的一份
mqttMessage.setRetained(false);
mqttClient.publish(TOPIC_NAME, mqttMessage);
}
mqttClient.disconnect();
mqttClient.close();
} catch (Exception e) {
e.printStackTrace();
}
}
2.2、接受消息
public static void subscribe(String clientId) {
try {
MqttClient mqttClient = new MqttClient(SERVER_URI, clientId, new MemoryPersistence());
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
//设为 false, 该订阅将被视为持久订阅
mqttConnectOptions.setCleanSession(true);
mqttConnectOptions.setUserName("admin");
mqttConnectOptions.setPassword("123456".toCharArray());
mqttClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
logger.info("连接断开:{}", cause.getMessage());
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
logger.info("接受到消息:clientId={},topic={},message={},isRetained={}", clientId, topic, message.toString(), message.isRetained());
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
}
});
mqttClient.connect(mqttConnectOptions);
mqttClient.subscribe(TOPIC_NAME);
} catch (Exception e) {
e.printStackTrace();
}
}
2.3、完整例子
package com.abc.demo.emqx;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MqttClientCase {
private static Logger logger = LoggerFactory.getLogger(MqttClientCase.class.getName());
private static final String SERVER_URI = "tcp://10.49.196.10:1883";
private static final String TOPIC_NAME = "test-topic";
public static void main(String[] args) throws Exception {
new Thread(() -> subscribe("client-subscribe-A")).start();
new Thread(() -> subscribe("client-subscribe-B")).start();
Thread.sleep(1000);
new Thread(() -> publish()).start();
Thread.sleep(1000 * 60);
}
public static void publish() {
try {
MqttClient mqttClient = new MqttClient(SERVER_URI, "client-publish", new MemoryPersistence());
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
/*会话清除标识
* false:服务端必须使用与 Client ID 关联的会话来恢复与客户端的通信。如果不存在这样的会话,服务器必须创建一个新会话。客户端和服务器在断开连接后必须存储会话的状态。
* true:客户端和服务器必须丢弃任何先前的会话并创建一个新的会话。该会话的生命周期将和网络连接保持一致,其会话状态一定不能被之后的任何会话重用。
*/
mqttConnectOptions.setCleanSession(true);
mqttConnectOptions.setUserName("admin");
mqttConnectOptions.setPassword("123456".toCharArray());
mqttConnectOptions.setKeepAliveInterval(10);
mqttClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
logger.info("连接断开:{}", cause.getMessage());
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
logger.info("发送完成:{}", token.isComplete());
}
});
mqttClient.connect(mqttConnectOptions);
for (int i = 0; i < 10; i++) {
MqttMessage mqttMessage = new MqttMessage(("测试消息" + (i + 1)).getBytes());
mqttMessage.setQos(2);
//是否保留消息,只能保留最新的一份
mqttMessage.setRetained(false);
mqttClient.publish(TOPIC_NAME, mqttMessage);
}
mqttClient.disconnect();
mqttClient.close();
} catch (Exception e) {
e.printStackTrace();
}
}
public static void subscribe(String clientId) {
try {
MqttClient mqttClient = new MqttClient(SERVER_URI, clientId, new MemoryPersistence());
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
//设为 false, 该订阅将被视为持久订阅
mqttConnectOptions.setCleanSession(true);
mqttConnectOptions.setUserName("admin");
mqttConnectOptions.setPassword("123456".toCharArray());
mqttClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
logger.info("连接断开:{}", cause.getMessage());
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
logger.info("接受到消息:clientId={},topic={},message={},isRetained={}", clientId, topic, message.toString(), message.isRetained());
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
}
});
mqttClient.connect(mqttConnectOptions);
mqttClient.subscribe(TOPIC_NAME);
} catch (Exception e) {
e.printStackTrace();
}
}
}
MqttClientCase.java
3、异步方式收发消息
3.1、发送消息
public static void publish() {
try {
MqttAsyncClient mqttClient = new MqttAsyncClient(SERVER_URI, "client-publish", new MemoryPersistence());
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
/*会话清除标识
* false:服务端必须使用与 Client ID 关联的会话来恢复与客户端的通信。如果不存在这样的会话,服务器必须创建一个新会话。客户端和服务器在断开连接后必须存储会话的状态。
* true:客户端和服务器必须丢弃任何先前的会话并创建一个新的会话。该会话的生命周期将和网络连接保持一致,其会话状态一定不能被之后的任何会话重用。
*/
mqttConnectOptions.setCleanSession(true);
mqttConnectOptions.setUserName("admin");
mqttConnectOptions.setPassword("123456".toCharArray());
mqttConnectOptions.setKeepAliveInterval(10);
mqttClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
logger.info("连接断开:{}", cause.getMessage());
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
logger.info("发送完成:{}", token.isComplete());
}
});
IMqttToken iMqttToken = mqttClient.connect(mqttConnectOptions);
iMqttToken.waitForCompletion();
for (int i = 0; i < 10; i++) {
MqttMessage mqttMessage = new MqttMessage(("测试消息" + (i + 1)).getBytes());
mqttMessage.setQos(2);
//是否保留消息,只能保留最新的一份
mqttMessage.setRetained(false);
IMqttDeliveryToken iMqttDeliveryToken = mqttClient.publish(TOPIC_NAME, mqttMessage);
iMqttDeliveryToken.waitForCompletion();
}
mqttClient.disconnect().waitForCompletion();
mqttClient.close();
} catch (Exception e) {
e.printStackTrace();
}
}
3.2、接受消息
public static void subscribe(String clientId) {
try {
MqttAsyncClient mqttClient = new MqttAsyncClient(SERVER_URI, clientId, new MemoryPersistence());
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
//设为 false, 该订阅将被视为持久订阅
mqttConnectOptions.setCleanSession(true);
mqttConnectOptions.setUserName("admin");
mqttConnectOptions.setPassword("123456".toCharArray());
mqttClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
logger.info("连接断开:{}", cause.getMessage());
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
logger.info("接受到消息:clientId={},topic={},message={},isRetained={}", clientId, topic, message.toString(), message.isRetained());
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
}
});
IMqttToken iMqttToken = mqttClient.connect(mqttConnectOptions);
iMqttToken.waitForCompletion();
mqttClient.subscribe(TOPIC_NAME, 2).waitForCompletion();
} catch (Exception e) {
e.printStackTrace();
}
}
3.2、完整例子
package com.abc.demo.emqx;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MqttAsyncClientCase {
private static Logger logger = LoggerFactory.getLogger(MqttAsyncClientCase.class.getName());
private static final String SERVER_URI = "tcp://10.49.196.10:1883";
private static final String TOPIC_NAME = "test-topic";
public static void main(String[] args) throws Exception {
new Thread(() -> subscribe("client-subscribe-A")).start();
new Thread(() -> subscribe("client-subscribe-B")).start();
Thread.sleep(1000);
new Thread(() -> publish()).start();
Thread.sleep(1000 * 60);
}
public static void publish() {
try {
MqttAsyncClient mqttClient = new MqttAsyncClient(SERVER_URI, "test-client-publish", new MemoryPersistence());
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
/*会话清除标识
* false:服务端必须使用与 Client ID 关联的会话来恢复与客户端的通信。如果不存在这样的会话,服务器必须创建一个新会话。客户端和服务器在断开连接后必须存储会话的状态。
* true:客户端和服务器必须丢弃任何先前的会话并创建一个新的会话。该会话的生命周期将和网络连接保持一致,其会话状态一定不能被之后的任何会话重用。
*/
mqttConnectOptions.setCleanSession(true);
mqttConnectOptions.setUserName("admin");
mqttConnectOptions.setPassword("123456".toCharArray());
mqttConnectOptions.setKeepAliveInterval(10);
mqttClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
logger.info("连接断开:{}", cause.getMessage());
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
logger.info("发送完成:{}", token.isComplete());
}
});
IMqttToken iMqttToken = mqttClient.connect(mqttConnectOptions);
iMqttToken.waitForCompletion();
for (int i = 0; i < 10; i++) {
MqttMessage mqttMessage = new MqttMessage(("测试消息" + (i + 1)).getBytes());
mqttMessage.setQos(2);
//是否保留消息,只能保留最新的一份
mqttMessage.setRetained(false);
IMqttDeliveryToken iMqttDeliveryToken = mqttClient.publish(TOPIC_NAME, mqttMessage);
iMqttDeliveryToken.waitForCompletion();
}
mqttClient.disconnect().waitForCompletion();
mqttClient.close();
} catch (Exception e) {
e.printStackTrace();
}
}
public static void subscribe(String clientId) {
try {
MqttAsyncClient mqttClient = new MqttAsyncClient(SERVER_URI, clientId, new MemoryPersistence());
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
//设为 false, 该订阅将被视为持久订阅
mqttConnectOptions.setCleanSession(true);
mqttConnectOptions.setUserName("admin");
mqttConnectOptions.setPassword("123456".toCharArray());
mqttClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
logger.info("连接断开:{}", cause.getMessage());
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
logger.info("接受到消息:clientId={},topic={},message={},isRetained={}", clientId, topic, message.toString(), message.isRetained());
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
}
});
IMqttToken iMqttToken = mqttClient.connect(mqttConnectOptions);
iMqttToken.waitForCompletion();
mqttClient.subscribe(TOPIC_NAME, 2).waitForCompletion();
} catch (Exception e) {
e.printStackTrace();
}
}
}
MqttAsyncClientCase.java