0
点赞
收藏
分享

微信扫一扫

EMQX 入门实战(2)--MQTT Java 客户端库使用

​​Eclipse Paho Java Client​​ 是用 Java 编写的 MQTT 客户端库(MQTT Java Client),可用于 JVM 或其他 Java 兼容平台(例如Android);本文主要介绍使用如何使用它来操作 EMQX,文中所使用到的软件版本:EMQX 4.2.2、Paho 1.2.5、Java 1.8.0_321。

1、引入依赖

<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>

2、同步方式收发消息

2.1、发送消息

public static void publish() {
try {
MqttClient mqttClient = new MqttClient(SERVER_URI, "client-publish", new MemoryPersistence());
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
/*会话清除标识
* false:服务端必须使用与 Client ID 关联的会话来恢复与客户端的通信。如果不存在这样的会话,服务器必须创建一个新会话。客户端和服务器在断开连接后必须存储会话的状态。
* true:客户端和服务器必须丢弃任何先前的会话并创建一个新的会话。该会话的生命周期将和网络连接保持一致,其会话状态一定不能被之后的任何会话重用。
*/
mqttConnectOptions.setCleanSession(true);
mqttConnectOptions.setUserName("admin");
mqttConnectOptions.setPassword("123456".toCharArray());
mqttConnectOptions.setKeepAliveInterval(10);
mqttClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
logger.info("连接断开:{}", cause.getMessage());
}

@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
}

@Override
public void deliveryComplete(IMqttDeliveryToken token) {
logger.info("发送完成:{}", token.isComplete());
}
});
mqttClient.connect(mqttConnectOptions);
for (int i = 0; i < 10; i++) {
MqttMessage mqttMessage = new MqttMessage(("测试消息" + (i + 1)).getBytes());
mqttMessage.setQos(2);
//是否保留消息,只能保留最新的一份
mqttMessage.setRetained(false);
mqttClient.publish(TOPIC_NAME, mqttMessage);
}

mqttClient.disconnect();
mqttClient.close();
} catch (Exception e) {
e.printStackTrace();
}
}

2.2、接受消息

public static void subscribe(String clientId) {
try {
MqttClient mqttClient = new MqttClient(SERVER_URI, clientId, new MemoryPersistence());
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
//设为 false, 该订阅将被视为持久订阅
mqttConnectOptions.setCleanSession(true);
mqttConnectOptions.setUserName("admin");
mqttConnectOptions.setPassword("123456".toCharArray());
mqttClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
logger.info("连接断开:{}", cause.getMessage());
}

@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
logger.info("接受到消息:clientId={},topic={},message={},isRetained={}", clientId, topic, message.toString(), message.isRetained());
}

@Override
public void deliveryComplete(IMqttDeliveryToken token) {
}
});
mqttClient.connect(mqttConnectOptions);
mqttClient.subscribe(TOPIC_NAME);
} catch (Exception e) {
e.printStackTrace();
}
}

2.3、完整例子

EMQX 入门实战(2)--MQTT Java 客户端库使用_服务器EMQX 入门实战(2)--MQTT Java 客户端库使用_客户端_02

package com.abc.demo.emqx;

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MqttClientCase {
private static Logger logger = LoggerFactory.getLogger(MqttClientCase.class.getName());

private static final String SERVER_URI = "tcp://10.49.196.10:1883";
private static final String TOPIC_NAME = "test-topic";

public static void main(String[] args) throws Exception {
new Thread(() -> subscribe("client-subscribe-A")).start();
new Thread(() -> subscribe("client-subscribe-B")).start();
Thread.sleep(1000);
new Thread(() -> publish()).start();

Thread.sleep(1000 * 60);
}

public static void publish() {
try {
MqttClient mqttClient = new MqttClient(SERVER_URI, "client-publish", new MemoryPersistence());
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
/*会话清除标识
* false:服务端必须使用与 Client ID 关联的会话来恢复与客户端的通信。如果不存在这样的会话,服务器必须创建一个新会话。客户端和服务器在断开连接后必须存储会话的状态。
* true:客户端和服务器必须丢弃任何先前的会话并创建一个新的会话。该会话的生命周期将和网络连接保持一致,其会话状态一定不能被之后的任何会话重用。
*/
mqttConnectOptions.setCleanSession(true);
mqttConnectOptions.setUserName("admin");
mqttConnectOptions.setPassword("123456".toCharArray());
mqttConnectOptions.setKeepAliveInterval(10);
mqttClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
logger.info("连接断开:{}", cause.getMessage());
}

@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
}

@Override
public void deliveryComplete(IMqttDeliveryToken token) {
logger.info("发送完成:{}", token.isComplete());
}
});
mqttClient.connect(mqttConnectOptions);
for (int i = 0; i < 10; i++) {
MqttMessage mqttMessage = new MqttMessage(("测试消息" + (i + 1)).getBytes());
mqttMessage.setQos(2);
//是否保留消息,只能保留最新的一份
mqttMessage.setRetained(false);
mqttClient.publish(TOPIC_NAME, mqttMessage);
}

mqttClient.disconnect();
mqttClient.close();
} catch (Exception e) {
e.printStackTrace();
}
}

public static void subscribe(String clientId) {
try {
MqttClient mqttClient = new MqttClient(SERVER_URI, clientId, new MemoryPersistence());
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
//设为 false, 该订阅将被视为持久订阅
mqttConnectOptions.setCleanSession(true);
mqttConnectOptions.setUserName("admin");
mqttConnectOptions.setPassword("123456".toCharArray());
mqttClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
logger.info("连接断开:{}", cause.getMessage());
}

@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
logger.info("接受到消息:clientId={},topic={},message={},isRetained={}", clientId, topic, message.toString(), message.isRetained());
}

@Override
public void deliveryComplete(IMqttDeliveryToken token) {
}
});
mqttClient.connect(mqttConnectOptions);
mqttClient.subscribe(TOPIC_NAME);
} catch (Exception e) {
e.printStackTrace();
}
}
}

MqttClientCase.java

3、异步方式收发消息

3.1、发送消息

public static void publish() {
try {
MqttAsyncClient mqttClient = new MqttAsyncClient(SERVER_URI, "client-publish", new MemoryPersistence());
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
/*会话清除标识
* false:服务端必须使用与 Client ID 关联的会话来恢复与客户端的通信。如果不存在这样的会话,服务器必须创建一个新会话。客户端和服务器在断开连接后必须存储会话的状态。
* true:客户端和服务器必须丢弃任何先前的会话并创建一个新的会话。该会话的生命周期将和网络连接保持一致,其会话状态一定不能被之后的任何会话重用。
*/
mqttConnectOptions.setCleanSession(true);
mqttConnectOptions.setUserName("admin");
mqttConnectOptions.setPassword("123456".toCharArray());
mqttConnectOptions.setKeepAliveInterval(10);
mqttClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
logger.info("连接断开:{}", cause.getMessage());
}

@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
}

@Override
public void deliveryComplete(IMqttDeliveryToken token) {
logger.info("发送完成:{}", token.isComplete());
}
});
IMqttToken iMqttToken = mqttClient.connect(mqttConnectOptions);
iMqttToken.waitForCompletion();
for (int i = 0; i < 10; i++) {
MqttMessage mqttMessage = new MqttMessage(("测试消息" + (i + 1)).getBytes());
mqttMessage.setQos(2);
//是否保留消息,只能保留最新的一份
mqttMessage.setRetained(false);
IMqttDeliveryToken iMqttDeliveryToken = mqttClient.publish(TOPIC_NAME, mqttMessage);
iMqttDeliveryToken.waitForCompletion();
}

mqttClient.disconnect().waitForCompletion();
mqttClient.close();
} catch (Exception e) {
e.printStackTrace();
}
}

3.2、接受消息

public static void subscribe(String clientId) {
try {
MqttAsyncClient mqttClient = new MqttAsyncClient(SERVER_URI, clientId, new MemoryPersistence());
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
//设为 false, 该订阅将被视为持久订阅
mqttConnectOptions.setCleanSession(true);
mqttConnectOptions.setUserName("admin");
mqttConnectOptions.setPassword("123456".toCharArray());
mqttClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
logger.info("连接断开:{}", cause.getMessage());
}

@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
logger.info("接受到消息:clientId={},topic={},message={},isRetained={}", clientId, topic, message.toString(), message.isRetained());
}

@Override
public void deliveryComplete(IMqttDeliveryToken token) {
}
});
IMqttToken iMqttToken = mqttClient.connect(mqttConnectOptions);
iMqttToken.waitForCompletion();
mqttClient.subscribe(TOPIC_NAME, 2).waitForCompletion();
} catch (Exception e) {
e.printStackTrace();
}
}

3.2、完整例子

EMQX 入门实战(2)--MQTT Java 客户端库使用_服务器EMQX 入门实战(2)--MQTT Java 客户端库使用_客户端_02

package com.abc.demo.emqx;

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MqttAsyncClientCase {
private static Logger logger = LoggerFactory.getLogger(MqttAsyncClientCase.class.getName());

private static final String SERVER_URI = "tcp://10.49.196.10:1883";
private static final String TOPIC_NAME = "test-topic";

public static void main(String[] args) throws Exception {
new Thread(() -> subscribe("client-subscribe-A")).start();
new Thread(() -> subscribe("client-subscribe-B")).start();
Thread.sleep(1000);
new Thread(() -> publish()).start();

Thread.sleep(1000 * 60);
}

public static void publish() {
try {
MqttAsyncClient mqttClient = new MqttAsyncClient(SERVER_URI, "test-client-publish", new MemoryPersistence());
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
/*会话清除标识
* false:服务端必须使用与 Client ID 关联的会话来恢复与客户端的通信。如果不存在这样的会话,服务器必须创建一个新会话。客户端和服务器在断开连接后必须存储会话的状态。
* true:客户端和服务器必须丢弃任何先前的会话并创建一个新的会话。该会话的生命周期将和网络连接保持一致,其会话状态一定不能被之后的任何会话重用。
*/
mqttConnectOptions.setCleanSession(true);
mqttConnectOptions.setUserName("admin");
mqttConnectOptions.setPassword("123456".toCharArray());
mqttConnectOptions.setKeepAliveInterval(10);
mqttClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
logger.info("连接断开:{}", cause.getMessage());
}

@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
}

@Override
public void deliveryComplete(IMqttDeliveryToken token) {
logger.info("发送完成:{}", token.isComplete());
}
});
IMqttToken iMqttToken = mqttClient.connect(mqttConnectOptions);
iMqttToken.waitForCompletion();
for (int i = 0; i < 10; i++) {
MqttMessage mqttMessage = new MqttMessage(("测试消息" + (i + 1)).getBytes());
mqttMessage.setQos(2);
//是否保留消息,只能保留最新的一份
mqttMessage.setRetained(false);
IMqttDeliveryToken iMqttDeliveryToken = mqttClient.publish(TOPIC_NAME, mqttMessage);
iMqttDeliveryToken.waitForCompletion();
}

mqttClient.disconnect().waitForCompletion();
mqttClient.close();
} catch (Exception e) {
e.printStackTrace();
}
}

public static void subscribe(String clientId) {
try {
MqttAsyncClient mqttClient = new MqttAsyncClient(SERVER_URI, clientId, new MemoryPersistence());
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
//设为 false, 该订阅将被视为持久订阅
mqttConnectOptions.setCleanSession(true);
mqttConnectOptions.setUserName("admin");
mqttConnectOptions.setPassword("123456".toCharArray());
mqttClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
logger.info("连接断开:{}", cause.getMessage());
}

@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
logger.info("接受到消息:clientId={},topic={},message={},isRetained={}", clientId, topic, message.toString(), message.isRetained());
}

@Override
public void deliveryComplete(IMqttDeliveryToken token) {
}
});
IMqttToken iMqttToken = mqttClient.connect(mqttConnectOptions);
iMqttToken.waitForCompletion();
mqttClient.subscribe(TOPIC_NAME, 2).waitForCompletion();
} catch (Exception e) {
e.printStackTrace();
}
}
}

MqttAsyncClientCase.java

 


举报

相关推荐

0 条评论