概述
在上篇文章open62541 发布订阅(基于UDP)中主要讲述无代理情况下的订阅/发布实现,本篇主要是讲述如何实现基于mqtt实现的订阅/发布。相关mqtt的知识可以自行百度查看,这里不过多说明,只是将实现功能所需步骤进行讲述。下面将会介绍一下windows下mqtt安装以及使用。
windows下mosquitto使用
1、下载mosquitto
链接:https://mosquitto.org/download/
2、安装
安装完毕后进入安装目录,如下:
3、配置
在这个目录中打开命令行
输入cmd,回车,命令行就弹出来了。
- 设置用户名密码
- 设置端口
4、使用
这里以主题名称为topic1进行说明
首先启动mosquitto:
订阅端:
发布端:
运行结果如下:
本地测试,后面用户名和密码可以省略。
MQTTX使用
1、概述
MQTTX是一款专业的开源跨平台MQTT 5.0客户端工具,MQTTX软件支持macOS, Linux, Windows,并且还可以支持MQTT消息格式转换,通过MQTTX软件用户能够快速创建连接保存并同时建立多个连接客户端。接下来简述一下其使用。
2、安装
下载地址:https://mqttx.app/
3、使用
- 连接
打开MQTTX软件后,新建连接
弹出的窗口中我们设定相对应的信息,
之后进行连接
这样就连接成功了,我们可以添加订阅topic或者发布信息到topic中。添加订阅如下:
设定好我们要订阅的top名称,这里使用testtopic,我们使用mosquitto_pub.exe去像testtopic主题发布一个信息
可见,MQTTX客户端成功获取到对应信息。发布的话只要指定发布topic和设置内容即可,
这样内容就发布出去了,我们使用mosquitto_sub.exe去订阅对应testtopic主题也能对应获取到,这里就不进行展示了。
windows下配置pthread
由于在open62541中有关mqtt发布的接口涉及到pthread相关接口,因此我们需要配置一下pthread。
1、下载库
链接:http://sourceware.org/pub/pthreads-win32/pthreads-w32-2-9-1-release.zip
2、将库配置到vs中
将下载好的库解压,如下
其中我们需要的是Pre-built.2这个文件夹下的文件,dll、include、和lib
将include中的.h文件全部拷贝至VS安装目录下VC文件夹中include目录下
将lib文件夹中x64和x86文件夹拷贝至VS安装目录下VC文件夹中lib目录下
将dll文件夹下x64文件夹中所有dll拷贝至C:\Windows\System32(64位运行库),将x86文件夹中所有dll拷贝至C:\Windows\SysWOW64(32位运行库)
这样基本的环境就配置完成了,接下来开始步入正题,使用代码去实现基于mqtt的发布功能。
基于mqtt发布
- 编译open62541
勾选UA_AMALGAMATION、UA_ENABLE_PUBSUB、UA_ENABLE_JSON_ENCODING、UA_ENABLE_PUBSUB_MQTT四个选项
点击Generate后将生成的工程进行编译生成open62541.h和open62541.c
- publish
这里以open62541中自带例程tutorial_pubsub_mqtt_publish.c进行叙述,代码如下
#include "open62541.h"
#include "ua_network_pubsub_mqtt.h"
#include <signal.h>
#pragma comment(lib, "x86/pthreadVC2.lib")
#define CONNECTION_NAME "MQTT Publisher Connection"
#define TRANSPORT_PROFILE_URI "http://opcfoundation.org/UA-Profile/Transport/pubsub-mqtt"
#define MQTT_CLIENT_ID "TESTCLIENTPUBSUBMQTT"
#define CONNECTIONOPTION_NAME "mqttClientId"
#define PUBLISHER_TOPIC "customTopic"
#define PUBLISHER_METADATAQUEUENAME "MetaDataTopic"
#define PUBLISHER_METADATAUPDATETIME 0
#define BROKER_ADDRESS_URL "opc.mqtt://127.0.0.1:1883"
#define PUBLISH_INTERVAL 500
// Uncomment the following line to enable MQTT login for the example
// #define EXAMPLE_USE_MQTT_LOGIN
#ifdef EXAMPLE_USE_MQTT_LOGIN
#define USERNAME_OPTION_NAME "mqttUsername"
#define PASSWORD_OPTION_NAME "mqttPassword"
#define MQTT_USERNAME "open62541user"
#define MQTT_PASSWORD "open62541"
#endif
// Uncomment the following line to enable MQTT via TLS for the example
// #define EXAMPLE_USE_MQTT_TLS
#ifdef EXAMPLE_USE_MQTT_TLS
#define USE_TLS_OPTION_NAME "mqttUseTLS"
#define MQTT_CA_FILE_PATH_OPTION_NAME "mqttCaFilePath"
#define CA_FILE_PATH "/path/to/server.cert"
#endif
#ifdef UA_ENABLE_JSON_ENCODING
static UA_Boolean useJson = true;
#else
static UA_Boolean useJson = false;
#endif
static UA_NodeId connectionIdent;
static UA_NodeId publishedDataSetIdent;
static UA_NodeId writerGroupIdent;
static void
addPubSubConnection(UA_Server *server, char *addressUrl) {
/* Details about the connection configuration and handling are located
* in the pubsub connection tutorial */
UA_PubSubConnectionConfig connectionConfig;
memset(&connectionConfig, 0, sizeof(connectionConfig));
connectionConfig.name = UA_STRING(CONNECTION_NAME);
connectionConfig.transportProfileUri = UA_STRING(TRANSPORT_PROFILE_URI);
connectionConfig.enabled = UA_TRUE;
/* configure address of the mqtt broker (local on default port) */
UA_NetworkAddressUrlDataType networkAddressUrl = {UA_STRING_NULL , UA_STRING(addressUrl)};
UA_Variant_setScalar(&connectionConfig.address, &networkAddressUrl,
&UA_TYPES[UA_TYPES_NETWORKADDRESSURLDATATYPE]);
/* Changed to static publisherId from random generation to identify
* the publisher on Subscriber side */
connectionConfig.publisherId.numeric = 2234;
/* configure options, set mqtt client id */
const int connectionOptionsCount = 1
#ifdef EXAMPLE_USE_MQTT_LOGIN
+ 2
#endif
#ifdef EXAMPLE_USE_MQTT_TLS
+ 2
#endif
;
UA_KeyValuePair connectionOptions[1];
size_t connectionOptionIndex = 0;
connectionOptions[connectionOptionIndex].key = UA_QUALIFIEDNAME(0, CONNECTIONOPTION_NAME);
UA_String mqttClientId = UA_STRING(MQTT_CLIENT_ID);
UA_Variant_setScalar(&connectionOptions[connectionOptionIndex++].value, &mqttClientId, &UA_TYPES[UA_TYPES_STRING]);
#ifdef EXAMPLE_USE_MQTT_LOGIN
connectionOptions[connectionOptionIndex].key = UA_QUALIFIEDNAME(0, USERNAME_OPTION_NAME);
UA_String mqttUsername = UA_STRING(MQTT_USERNAME);
UA_Variant_setScalar(&connectionOptions[connectionOptionIndex++].value, &mqttUsername, &UA_TYPES[UA_TYPES_STRING]);
connectionOptions[connectionOptionIndex].key = UA_QUALIFIEDNAME(0, PASSWORD_OPTION_NAME);
UA_String mqttPassword = UA_STRING(MQTT_PASSWORD);
UA_Variant_setScalar(&connectionOptions[connectionOptionIndex++].value, &mqttPassword, &UA_TYPES[UA_TYPES_STRING]);
#endif
#ifdef EXAMPLE_USE_MQTT_TLS
connectionOptions[connectionOptionIndex].key = UA_QUALIFIEDNAME(0, USE_TLS_OPTION_NAME);
UA_Boolean mqttUseTLS = true;
UA_Variant_setScalar(&connectionOptions[connectionOptionIndex++].value, &mqttUseTLS, &UA_TYPES[UA_TYPES_BOOLEAN]);
connectionOptions[connectionOptionIndex].key = UA_QUALIFIEDNAME(0, MQTT_CA_FILE_PATH_OPTION_NAME);
UA_String mqttCaFile = UA_STRING(CA_FILE_PATH);
UA_Variant_setScalar(&connectionOptions[connectionOptionIndex++].value, &mqttCaFile, &UA_TYPES[UA_TYPES_STRING]);
#endif
connectionConfig.connectionProperties = connectionOptions;
connectionConfig.connectionPropertiesSize = connectionOptionIndex;
UA_Server_addPubSubConnection(server, &connectionConfig, &connectionIdent);
}
/**
* **PublishedDataSet handling**
* The PublishedDataSet (PDS) and PubSubConnection are the toplevel entities and
* can exist alone. The PDS contains the collection of the published fields. All
* other PubSub elements are directly or indirectly linked with the PDS or
* connection.
*/
static void
addPublishedDataSet(UA_Server *server) {
/* The PublishedDataSetConfig contains all necessary public
* information for the creation of a new PublishedDataSet */
UA_PublishedDataSetConfig publishedDataSetConfig;
memset(&publishedDataSetConfig, 0, sizeof(UA_PublishedDataSetConfig));
publishedDataSetConfig.publishedDataSetType = UA_PUBSUB_DATASET_PUBLISHEDITEMS;
publishedDataSetConfig.name = UA_STRING("Demo PDS");
/* Create new PublishedDataSet based on the PublishedDataSetConfig. */
UA_Server_addPublishedDataSet(server, &publishedDataSetConfig, &publishedDataSetIdent);
}
/**
* **DataSetField handling**
* The DataSetField (DSF) is part of the PDS and describes exactly one published field.
*/
static void
addDataSetField(UA_Server *server) {
/* Add a field to the previous created PublishedDataSet */
UA_DataSetFieldConfig dataSetFieldConfig;
memset(&dataSetFieldConfig, 0, sizeof(UA_DataSetFieldConfig));
dataSetFieldConfig.dataSetFieldType = UA_PUBSUB_DATASETFIELD_VARIABLE;
dataSetFieldConfig.field.variable.fieldNameAlias = UA_STRING("Server localtime");
dataSetFieldConfig.field.variable.promotedField = UA_FALSE;
dataSetFieldConfig.field.variable.publishParameters.publishedVariable =
UA_NODEID_NUMERIC(0, UA_NS0ID_SERVER_SERVERSTATUS_CURRENTTIME);
dataSetFieldConfig.field.variable.publishParameters.attributeId = UA_ATTRIBUTEID_VALUE;
UA_Server_addDataSetField(server, publishedDataSetIdent, &dataSetFieldConfig, NULL);
}
/**
* **WriterGroup handling**
* The WriterGroup (WG) is part of the connection and contains the primary configuration
* parameters for the message creation.
*/
static UA_StatusCode
addWriterGroup(UA_Server *server, char *topic, int interval) {
UA_StatusCode retval = UA_STATUSCODE_GOOD;
/* Now we create a new WriterGroupConfig and add the group to the existing PubSubConnection. */
UA_WriterGroupConfig writerGroupConfig;
memset(&writerGroupConfig, 0, sizeof(UA_WriterGroupConfig));
writerGroupConfig.name = UA_STRING("Demo WriterGroup");
writerGroupConfig.publishingInterval = interval;
writerGroupConfig.enabled = UA_FALSE;
writerGroupConfig.writerGroupId = 100;
UA_UadpWriterGroupMessageDataType *writerGroupMessage;
/* decide whether to use JSON or UADP encoding*/
#ifdef UA_ENABLE_JSON_ENCODING
UA_JsonWriterGroupMessageDataType *Json_writerGroupMessage;
if(useJson) {
writerGroupConfig.encodingMimeType = UA_PUBSUB_ENCODING_JSON;
writerGroupConfig.messageSettings.encoding = UA_EXTENSIONOBJECT_DECODED;
writerGroupConfig.messageSettings.content.decoded.type = &UA_TYPES[UA_TYPES_JSONWRITERGROUPMESSAGEDATATYPE];
/* The configuration flags for the messages are encapsulated inside the
* message- and transport settings extension objects. These extension
* objects are defined by the standard. e.g.
* UadpWriterGroupMessageDataType */
Json_writerGroupMessage = UA_JsonWriterGroupMessageDataType_new();
/* Change message settings of writerGroup to send PublisherId,
* DataSetMessageHeader, SingleDataSetMessage and DataSetClassId in PayloadHeader
* of NetworkMessage */
Json_writerGroupMessage->networkMessageContentMask =
(UA_JsonNetworkMessageContentMask)(UA_JSONNETWORKMESSAGECONTENTMASK_NETWORKMESSAGEHEADER |
(UA_JsonNetworkMessageContentMask)UA_JSONNETWORKMESSAGECONTENTMASK_DATASETMESSAGEHEADER |
(UA_JsonNetworkMessageContentMask)UA_JSONNETWORKMESSAGECONTENTMASK_SINGLEDATASETMESSAGE |
(UA_JsonNetworkMessageContentMask)UA_JSONNETWORKMESSAGECONTENTMASK_PUBLISHERID |
(UA_JsonNetworkMessageContentMask)UA_JSONNETWORKMESSAGECONTENTMASK_DATASETCLASSID);
writerGroupConfig.messageSettings.content.decoded.data = Json_writerGroupMessage;
}
else
#endif
{
writerGroupConfig.encodingMimeType = UA_PUBSUB_ENCODING_UADP;
writerGroupConfig.messageSettings.encoding = UA_EXTENSIONOBJECT_DECODED;
writerGroupConfig.messageSettings.content.decoded.type = &UA_TYPES[UA_TYPES_UADPWRITERGROUPMESSAGEDATATYPE];
/* The configuration flags for the messages are encapsulated inside the
* message- and transport settings extension objects. These extension
* objects are defined by the standard. e.g.
* UadpWriterGroupMessageDataType */
writerGroupMessage = UA_UadpWriterGroupMessageDataType_new();
/* Change message settings of writerGroup to send PublisherId,
* WriterGroupId in GroupHeader and DataSetWriterId in PayloadHeader
* of NetworkMessage */
writerGroupMessage->networkMessageContentMask =
(UA_UadpNetworkMessageContentMask)(UA_UADPNETWORKMESSAGECONTENTMASK_PUBLISHERID |
(UA_UadpNetworkMessageContentMask)UA_UADPNETWORKMESSAGECONTENTMASK_GROUPHEADER |
(UA_UadpNetworkMessageContentMask)UA_UADPNETWORKMESSAGECONTENTMASK_WRITERGROUPID |
(UA_UadpNetworkMessageContentMask)UA_UADPNETWORKMESSAGECONTENTMASK_PAYLOADHEADER);
writerGroupConfig.messageSettings.content.decoded.data = writerGroupMessage;
}
/* configure the mqtt publish topic */
UA_BrokerWriterGroupTransportDataType brokerTransportSettings;
memset(&brokerTransportSettings, 0, sizeof(UA_BrokerWriterGroupTransportDataType));
/* Assign the Topic at which MQTT publish should happen */
/*ToDo: Pass the topic as argument from the writer group */
brokerTransportSettings.queueName = UA_STRING(topic);
brokerTransportSettings.resourceUri = UA_STRING_NULL;
brokerTransportSettings.authenticationProfileUri = UA_STRING_NULL;
/* Choose the QOS Level for MQTT */
brokerTransportSettings.requestedDeliveryGuarantee = UA_BROKERTRANSPORTQUALITYOFSERVICE_BESTEFFORT;
/* Encapsulate config in transportSettings */
UA_ExtensionObject transportSettings;
memset(&transportSettings, 0, sizeof(UA_ExtensionObject));
transportSettings.encoding = UA_EXTENSIONOBJECT_DECODED;
transportSettings.content.decoded.type = &UA_TYPES[UA_TYPES_BROKERWRITERGROUPTRANSPORTDATATYPE];
transportSettings.content.decoded.data = &brokerTransportSettings;
writerGroupConfig.transportSettings = transportSettings;
retval = UA_Server_addWriterGroup(server, connectionIdent, &writerGroupConfig, &writerGroupIdent);
if (retval == UA_STATUSCODE_GOOD)
UA_Server_setWriterGroupOperational(server, writerGroupIdent);
#ifdef UA_ENABLE_JSON_ENCODING
if (useJson) {
UA_JsonWriterGroupMessageDataType_delete(Json_writerGroupMessage);
}
#endif
if (!useJson && writerGroupMessage) {
UA_UadpWriterGroupMessageDataType_delete(writerGroupMessage);
}
return retval;
}
/**
* **DataSetWriter handling**
* A DataSetWriter (DSW) is the glue between the WG and the PDS. The DSW is
* linked to exactly one PDS and contains additional information for the
* message generation.
*/
static void
addDataSetWriter(UA_Server *server, char *topic) {
/* We need now a DataSetWriter within the WriterGroup. This means we must
* create a new DataSetWriterConfig and add call the addWriterGroup function. */
UA_NodeId dataSetWriterIdent;
UA_DataSetWriterConfig dataSetWriterConfig;
memset(&dataSetWriterConfig, 0, sizeof(UA_DataSetWriterConfig));
dataSetWriterConfig.name = UA_STRING("Demo DataSetWriter");
dataSetWriterConfig.dataSetWriterId = 62541;
dataSetWriterConfig.keyFrameCount = 10;
#ifdef UA_ENABLE_JSON_ENCODING
UA_JsonDataSetWriterMessageDataType jsonDswMd;
UA_ExtensionObject messageSettings;
if(useJson) {
/* JSON config for the dataSetWriter */
jsonDswMd.dataSetMessageContentMask = (UA_JsonDataSetMessageContentMask)
(UA_JSONDATASETMESSAGECONTENTMASK_DATASETWRITERID |
UA_JSONDATASETMESSAGECONTENTMASK_SEQUENCENUMBER |
UA_JSONDATASETMESSAGECONTENTMASK_STATUS |
UA_JSONDATASETMESSAGECONTENTMASK_METADATAVERSION |
UA_JSONDATASETMESSAGECONTENTMASK_TIMESTAMP);
messageSettings.encoding = UA_EXTENSIONOBJECT_DECODED;
messageSettings.content.decoded.type = &UA_TYPES[UA_TYPES_JSONDATASETWRITERMESSAGEDATATYPE];
messageSettings.content.decoded.data = &jsonDswMd;
dataSetWriterConfig.messageSettings = messageSettings;
}
#endif
/*TODO: Modify MQTT send to add DataSetWriters broker transport settings */
/*TODO: Pass the topic as argument from the writer group */
/*TODO: Publish Metadata to metaDataQueueName */
/* configure the mqtt publish topic */
UA_BrokerDataSetWriterTransportDataType brokerTransportSettings;
memset(&brokerTransportSettings, 0, sizeof(UA_BrokerDataSetWriterTransportDataType));
/* Assign the Topic at which MQTT publish should happen */
brokerTransportSettings.queueName = UA_STRING(topic);
brokerTransportSettings.resourceUri = UA_STRING_NULL;
brokerTransportSettings.authenticationProfileUri = UA_STRING_NULL;
brokerTransportSettings.metaDataQueueName = UA_STRING(PUBLISHER_METADATAQUEUENAME);
brokerTransportSettings.metaDataUpdateTime = PUBLISHER_METADATAUPDATETIME;
/* Choose the QOS Level for MQTT */
brokerTransportSettings.requestedDeliveryGuarantee = UA_BROKERTRANSPORTQUALITYOFSERVICE_BESTEFFORT;
/* Encapsulate config in transportSettings */
UA_ExtensionObject transportSettings;
memset(&transportSettings, 0, sizeof(UA_ExtensionObject));
transportSettings.encoding = UA_EXTENSIONOBJECT_DECODED;
transportSettings.content.decoded.type = &UA_TYPES[UA_TYPES_BROKERDATASETWRITERTRANSPORTDATATYPE];
transportSettings.content.decoded.data = &brokerTransportSettings;
dataSetWriterConfig.transportSettings = transportSettings;
UA_Server_addDataSetWriter(server, writerGroupIdent, publishedDataSetIdent,
&dataSetWriterConfig, &dataSetWriterIdent);
}
/**
* That's it! You're now publishing the selected fields. Open a packet
* inspection tool of trust e.g. wireshark and take a look on the outgoing
* packages. The following graphic figures out the packages created by this
* tutorial.
*
* .. figure:: ua-wireshark-pubsub.png
* :figwidth: 100 %
* :alt: OPC UA PubSub communication in wireshark
*
* The open62541 subscriber API will be released later. If you want to process
* the the datagrams, take a look on the ua_network_pubsub_networkmessage.c
* which already contains the decoding code for UADP messages.
*
* It follows the main server code, making use of the above definitions. */
UA_Boolean running = true;
static void stopHandler(int sign) {
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "received ctrl-c");
running = false;
}
static void usage(void) {
printf("Usage: tutorial_pubsub_mqtt_publish [--url <opc.mqtt://hostname:port>] "
"[--topic <mqttTopic>] "
"[--freq <frequency in ms> "
"[--json]\n"
" Defaults are:\n"
" - Url: opc.mqtt://127.0.0.1:1883\n"
" - Topic: customTopic\n"
" - Frequency: 500\n"
" - JSON: Off\n");
}
int main(int argc, char **argv) {
signal(SIGINT, stopHandler);
signal(SIGTERM, stopHandler);
/* TODO: Change to secure mqtt port:8883 */
char *addressUrl = BROKER_ADDRESS_URL;
char *topic = PUBLISHER_TOPIC;
int interval = PUBLISH_INTERVAL;
/* Parse arguments */
for(int argpos = 1; argpos < argc; argpos++) {
if(strcmp(argv[argpos], "--help") == 0) {
usage();
return 0;
}
if(strcmp(argv[argpos], "--json") == 0) {
useJson = true;
continue;
}
if(strcmp(argv[argpos], "--url") == 0) {
if(argpos + 1 == argc) {
usage();
return -1;
}
argpos++;
addressUrl = argv[argpos];
continue;
}
if(strcmp(argv[argpos], "--topic") == 0) {
if(argpos + 1 == argc) {
usage();
return -1;
}
argpos++;
topic = argv[argpos];
continue;
}
if(strcmp(argv[argpos], "--freq") == 0) {
if(argpos + 1 == argc) {
usage();
return -1;
}
if(sscanf(argv[argpos], "%d", &interval) != 1) {
usage();
return -1;
}
if(interval <= 10) {
UA_LOG_WARNING(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
"Publication interval too small");
return -1;
}
continue;
}
usage();
return -1;
}
UA_StatusCode retval = UA_STATUSCODE_GOOD;
/* Set up the server config */
UA_Server *server = UA_Server_new();
UA_ServerConfig *config = UA_Server_getConfig(server);
/* Details about the connection configuration and handling are located in
* the pubsub connection tutorial */
UA_ServerConfig_setDefault(config);
UA_ServerConfig_addPubSubTransportLayer(config, UA_PubSubTransportLayerMQTT());
addPubSubConnection(server, addressUrl);
addPublishedDataSet(server);
addDataSetField(server);
retval = addWriterGroup(server, topic, interval);
if (UA_STATUSCODE_GOOD != retval)
{
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "Error Name = %s", UA_StatusCode_name(retval));
return EXIT_FAILURE;
}
addDataSetWriter(server, topic);
//UA_PubSubConnection *connection = UA_PubSubConnection_findConnectionbyId(server, connectionIdent);
//if(!connection) {
// UA_LOG_WARNING(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
// "Could not create a PubSubConnection");
// UA_Server_delete(server);
// return -1;
//}
UA_Server_run(server, &running);
UA_Server_delete(server);
return 0;
}
这里对其代码没做什么修改,只是引入pthread库以及将UA_PubSubConnection_findConnectionbyId(server, connectionIdent);部分进行了注释,因为其实现是在open62541.c中,不进行注释的话会编译不通过。这里注释的话影响也不大,也可以通过addPubSubConnection(server, addressUrl);中UA_Server_addPubSubConnection接口返回值进行判断。流程与基于UDP的订阅/发布基本一致,可以参考open62541 发布订阅(基于UDP)一文。这里主要叙述编译所需要的一些文件以及修改。
这里需要的文件比较多,不是直接加入.h和.c就能直接编译运行,这里看一下工程目录
这里我们需要引入mqtt.c,ua_mqtt_adapter.c,ua_mqtt_pal.c和ua_network_pubsub_mqtt.c一起进行编译,对应的mqtt.h,ua_mqtt_adapter.h,ua_mqtt_pal.h和ua_network_pubsub_mqtt.h在open62541中都能够找到。我们将需要的.h都放入工程目录下,然后将对应的文件中包含的头文件进行修改,
其他.h和.c中的也是类型修改,这里就不再多说。进行编译时mqtt.c会进行报错如下,
这里可能是编译选项的原因,因为我们没有使用TLS,有些参数没有定义,所以可以将if(useTLS)这段语句块进行注释,重新编译后就能通过了
这个工程是向地址127.0.0.1端口号为1883的地址发布服务器的系统时间,其中主题为customTopic,我们可以运行然后使用MQTTX软件查看效果
可以看到我们使用MQTTX软件和mosquitto_sub.exe去订阅customTopic主题内容,都能够正常获取到发布的系统时间。
结尾
这部分相比基于UDP订阅/发布来说主要就是环境配置比较麻烦,其他的操作都类似。注意编译open62541时UA_ENABLE_JSON_ENCODING这个选项一定要勾选,不然发布的信息在MQTTX中订阅会显示乱码。