0
点赞
收藏
分享

微信扫一扫

MQTT通信

1.mqtt 模型

mqtt 是物联网的一种通信方式,主要组成部分由服务器,订阅者,发布者

订阅者向服务器subscribe某条消息,发布者在有消息更新事件的情况将消息publish给相关订阅者

MQTT通信_物联网

整个过程中主要有两个

  • 主题(topic)
  • 消息message 或者payload

2.主要的流程

  • 订阅者connect到server
  • 订阅者subscribe某个topic
  • 发布者connect 到server
  • 发布者将某个topic的消息publish给server
  • server 又将这个topic的消息publish个相关订阅者

整个过程中发布者和订阅者会每个段时间想server确认状态,看对方是否还活着

3.mqtt-源码分析

(参考大牛的源码-​​https://github.com/jiejieTop/mqttclient​​)

3.1 主函数main

  • 创建mqtt 对象
  • 设置相关属性 ip port 等
  • 连接服务器
  • 创建处理publish时间的线程

int main(void)
{
int res;
pthread_t thread1;
mqtt_client_t *client = NULL;
char client_id[32];
char user_name[32];
char password[32];

printf("\nwelcome to mqttclient test...\n");

random_string(client_id, 10);
random_string(user_name, 10);
random_string(password, 10);

mqtt_log_init();
// 申请mqtt 对象
client = mqtt_lease();
......
// 设置IP PORT
mqtt_set_host(client, "120.25.213.14");
mqtt_set_client_id(client, client_id);
mqtt_set_user_name(client, user_name);
mqtt_set_password(client, password);
mqtt_set_clean_session(client, 1);
// 连接服务器
mqtt_connect(client);
// 订阅相关topic消息
mqtt_subscribe(client, "topic1", QOS0, topic1_handler);
mqtt_subscribe(client, "topic2", QOS1, NULL);
mqtt_subscribe(client, "topic3", QOS2, NULL);
// 创建publish 线程处理pubish相关消息
res = pthread_create(&thread1, NULL, mqtt_publish_thread, client);

3.2连接服务器相关处理逻辑

  • 初始化网络
  • 设置connect_data参数
  • 连接服务器
  • 创建一个线程用来订阅


static int mqtt_connect_with_results(mqtt_client_t* c)
{
int len = 0;
int rc = MQTT_CONNECT_FAILED_ERROR;
....

#ifndef MQTT_NETWORK_TYPE_NO_TLS
rc = network_init(c->mqtt_network, c->mqtt_host, c->mqtt_port, c->mqtt_ca);
#else
// 初始化网络
rc = network_init(c->mqtt_network, c->mqtt_host, c->mqtt_port, NULL);
#endif
// 连接网络
rc = network_connect(c->mqtt_network);
if (MQTT_SUCCESS_ERROR != rc) {
if (NULL != c->mqtt_network) {
network_release(c->mqtt_network);
RETURN_ERROR(rc);
}
}
// 初始化connect_data
connect_data.keepAliveInterval = c->mqtt_keep_alive_interval;
connect_data.cleansession = c->mqtt_clean_session;
connect_data.MQTTVersion = c->mqtt_version;
connect_data.clientID.cstring= c->mqtt_client_id;
connect_data.username.cstring = c->mqtt_user_name;
connect_data.password.cstring = c->mqtt_password;

if (c->mqtt_will_flag) {
connect_data.willFlag = c->mqtt_will_flag;
connect_data.will.message.cstring = c->mqtt_will_options->will_message;
connect_data.will.qos = c->mqtt_will_options->will_qos;
connect_data.will.retained = c->mqtt_will_options->will_retained;
connect_data.will.topicName.cstring = c->mqtt_will_options->will_topic;
}
// 获取timer
platform_timer_cutdown(&c->mqtt_last_received, (c->mqtt_keep_alive_interval * 1000));

platform_mutex_lock(&c->mqtt_write_lock);

/* serialize connect packet */
if ((len = MQTTSerialize_connect(c->mqtt_write_buf, c->mqtt_write_buf_size, &connect_data)) <= 0)
goto exit;

platform_timer_cutdown(&connect_timer, c->mqtt_cmd_timeout);

/* send connect packet */
if ((rc = mqtt_send_packet(c, len, &connect_timer)) != MQTT_SUCCESS_ERROR)
goto exit;
// 等等server 回复ack
if (mqtt_wait_packet(c, CONNACK, &connect_timer) == CONNACK) {
if (MQTTDeserialize_connack(&connack_data.session_present, &connack_data.rc, c->mqtt_read_buf, c->mqtt_read_buf_size) == 1)
rc = connack_data.rc;
else
rc = MQTT_CONNECT_FAILED_ERROR;
} else
rc = MQTT_CONNECT_FAILED_ERROR;

exit:
if (rc == MQTT_SUCCESS_ERROR) {
if(NULL == c->mqtt_thread) {

/* connect success, and need init mqtt thread */
c->mqtt_thread= platform_thread_init("mqtt_yield_thread", mqtt_yield_thread, c, MQTT_THREAD_STACK_SIZE, MQTT_THREAD_PRIO, MQTT_THREAD_TICK);

if (NULL != c->mqtt_thread) {
mqtt_set_client_state(c, CLIENT_STATE_CONNECTED);
platform_thread_startup(c->mqtt_thread);
platform_thread_start(c->mqtt_thread); /* start run mqtt thread */
} else {
/*creat the thread fail and disconnect the mqtt socket connect*/
network_release(c->mqtt_network);
rc = MQTT_CONNECT_FAILED_ERROR;
MQTT_LOG_W("%s:%d %s()... mqtt yield thread creat failed...", __FILE__, __LINE__, __FUNCTION__);
}
} else {
mqtt_set_client_state(c, CLIENT_STATE_CONNECTED); /* reconnect, mqtt thread is already exists */
}

c->mqtt_ping_outstanding = 0; /* reset ping outstanding */

} else {
network_release(c->mqtt_network);
mqtt_set_client_state(c, CLIENT_STATE_INITIALIZED); /* connect failed */
}

platform_mutex_unlock(&c->mqtt_write_lock);

RETURN_ERROR(rc);
}

3.3 连接时创建的线程分析

static int mqtt_connect_with_results(mqtt_client_t* c)
{
......
#ifndef MQTT_NETWORK_TYPE_NO_TLS
rc = network_init(c->mqtt_network, c->mqtt_host, c->mqtt_port, c->mqtt_ca);
#else
rc = network_init(c->mqtt_network, c->mqtt_host, c->mqtt_port, NULL);
#endif

rc = network_connect(c->mqtt_network);

.......

/* connect success, and need init mqtt thread */
c->mqtt_thread= platform_thread_init("mqtt_yield_thread", mqtt_yield_thread, c, MQTT_THREAD_STACK_SIZE, MQTT_THREAD_PRIO, MQTT_THREAD_TICK);

RETURN_ERROR(rc);
}

client 在连接服务器的时候会在内部创建一个线程mqtt_yield_thread

static void mqtt_yield_thread(void *arg)
{
int rc;
client_state_t state;
mqtt_client_t *c = (mqtt_client_t *)arg;
platform_thread_t *thread_to_be_destoried = NULL;
// 获取clinet 状态
state = mqtt_get_client_state(c);
if (CLIENT_STATE_CONNECTED != state) {
MQTT_LOG_W("%s:%d %s()..., mqtt is not connected to the server...", __FILE__, __LINE__, __FUNCTION__);
platform_thread_stop(c->mqtt_thread); /* mqtt is not connected to the server, stop thread */
}

while (1) {
// 循环处理消息,mqtt_yield里面进行处理
rc = mqtt_yield(c, c->mqtt_cmd_timeout);
if (MQTT_CLEAN_SESSION_ERROR == rc) {
MQTT_LOG_W("%s:%d %s()..., mqtt clean session....", __FILE__, __LINE__, __FUNCTION__);
network_disconnect(c->mqtt_network);
mqtt_clean_session(c);
goto exit;
} else if (MQTT_RECONNECT_TIMEOUT_ERROR == rc) {
MQTT_LOG_W("%s:%d %s()..., mqtt reconnect timeout....", __FILE__, __LINE__, __FUNCTION__);
}
}

exit:
thread_to_be_destoried = c->mqtt_thread;
c->mqtt_thread = (platform_thread_t *)0;
platform_thread_destroy(thread_to_be_destoried);
}

static int mqtt_yield(mqtt_client_t* c, int timeout_ms)
{
int rc = MQTT_SUCCESS_ERROR;
client_state_t state;
platform_timer_t timer;

if (NULL == c)
RETURN_ERROR(MQTT_FAILED_ERROR);

if (0 == timeout_ms)
timeout_ms = c->mqtt_cmd_timeout;

platform_timer_init(&timer);
platform_timer_cutdown(&timer, timeout_ms);

while (!platform_timer_is_expired(&timer)) {
state = mqtt_get_client_state(c);
if (CLIENT_STATE_CLEAN_SESSION == state) {
RETURN_ERROR(MQTT_CLEAN_SESSION_ERROR);
} else if (CLIENT_STATE_CONNECTED != state) {
/* mqtt not connect, need reconnect */
rc = mqtt_try_reconnect(c);

if (MQTT_RECONNECT_TIMEOUT_ERROR == rc)
RETURN_ERROR(rc);
continue;
}

/* mqtt connected, handle mqtt packet */
rc = mqtt_packet_handle(c, &timer);

if (rc >= 0) {
/* scan ack list, destroy ack handler that have timed out or resend them */
mqtt_ack_list_scan(c, 1);

} else if (MQTT_NOT_CONNECT_ERROR == rc) {
MQTT_LOG_E("%s:%d %s()... mqtt not connect", __FILE__, __LINE__, __FUNCTION__);
} else {
break;
}
}

RETURN_ERROR(rc);
}

在mqtt_yield里面会循环处理server 的packet数据,并且会每隔一段时间ping 保存双方都alive

static int mqtt_packet_handle(mqtt_client_t* c, platform_timer_t* timer)
{
int rc = MQTT_SUCCESS_ERROR;
int packet_type = 0;

rc = mqtt_read_packet(c, &packet_type, timer);

switch (packet_type) {
case 0: /* timed out reading packet or an error occurred while reading data*/
if (MQTT_BUFFER_TOO_SHORT_ERROR == rc) {
MQTT_LOG_E("the client read buffer is too short, please call mqtt_set_read_buf_size() to reset the buffer size");
/* don't return directly, you need to stay active, because there is data readable now, but the buffer is too small */
}
break;

case CONNACK: /* has been processed */
goto exit;

case PUBACK:
case PUBCOMP:
rc = mqtt_puback_and_pubcomp_packet_handle(c, timer);
break;

case SUBACK:
rc = mqtt_suback_packet_handle(c, timer);
break;

case UNSUBACK:
rc = mqtt_unsuback_packet_handle(c, timer);
break;

case PUBLISH:
rc = mqtt_publish_packet_handle(c, timer);
break;

case PUBREC:
case PUBREL:
rc = mqtt_pubrec_and_pubrel_packet_handle(c, timer);
break;

case PINGRESP:
c->mqtt_ping_outstanding = 0; /* keep alive ping success */
break;

default:
goto exit;
}

rc = mqtt_keep_alive(c);

exit:
if (rc == MQTT_SUCCESS_ERROR)
rc = packet_type;

RETURN_ERROR(rc);
}

int mqtt_keep_alive(mqtt_client_t* c)
{
int rc = MQTT_SUCCESS_ERROR;

rc = mqtt_is_connected(c);
if (MQTT_SUCCESS_ERROR != rc)
RETURN_ERROR(rc);

if (platform_timer_is_expired(&c->mqtt_last_sent) || platform_timer_is_expired(&c->mqtt_last_received)) {
if (c->mqtt_ping_outstanding) {
MQTT_LOG_W("%s:%d %s()... ping outstanding", __FILE__, __LINE__, __FUNCTION__);
/*must realse the socket file descriptor zhaoshimin 20200629*/
network_release(c->mqtt_network);

mqtt_set_client_state(c, CLIENT_STATE_DISCONNECTED);
rc = MQTT_NOT_CONNECT_ERROR; /* PINGRESP not received in keepalive interval */
} else {
platform_timer_t timer;
int len = MQTTSerialize_pingreq(c->mqtt_write_buf, c->mqtt_write_buf_size);
if (len > 0)
rc = mqtt_send_packet(c, len, &timer); // 100ask, send the ping packet
c->mqtt_ping_outstanding++;
}
}

RETURN_ERROR(rc);
}

3.4 client 订阅

订阅的时候一般会提供主题topic 和对应处理主题消息的mesaage handler函数

mqtt_subscribe(client, "topic1", QOS0, topic1_handler);

int mqtt_subscribe(mqtt_client_t* c, const char* topic_filter, mqtt_qos_t qos, message_handler_t handler)
{
int rc = MQTT_SUBSCRIBE_ERROR;
int len = 0;
uint16_t packet_id;
platform_timer_t timer;
MQTTString topic = MQTTString_initializer;
topic.cstring = (char *)topic_filter;
message_handlers_t *msg_handler = NULL;

if (CLIENT_STATE_CONNECTED != mqtt_get_client_state(c))
RETURN_ERROR(MQTT_NOT_CONNECT_ERROR);

platform_mutex_lock(&c->mqtt_write_lock);

packet_id = mqtt_get_next_packet_id(c);

/* serialize subscribe packet and send it */
len = MQTTSerialize_subscribe(c->mqtt_write_buf, c->mqtt_write_buf_size, 0, packet_id, 1, &topic, (int*)&qos);
if (len <= 0)
goto exit;

if ((rc = mqtt_send_packet(c, len, &timer)) != MQTT_SUCCESS_ERROR)
goto exit;

if (NULL == handler)
handler = default_msg_handler; /* if handler is not specified, the default handler is used */

/* create a message and record it */
msg_handler = mqtt_msg_handler_create(topic_filter, qos, handler);
if (NULL == msg_handler) {
rc = MQTT_MEM_NOT_ENOUGH_ERROR;
goto exit;
}
rc = mqtt_ack_list_record(c, SUBACK, packet_id, len, msg_handler);

exit:

platform_mutex_unlock(&c->mqtt_write_lock);

RETURN_ERROR(rc);
}

static message_handlers_t *mqtt_msg_handler_create(const char* topic_filter, mqtt_qos_t qos, message_handler_t handler)
{
message_handlers_t *msg_handler = NULL;

msg_handler = (message_handlers_t *) platform_memory_alloc(sizeof(message_handlers_t));
if (NULL == msg_handler)
return NULL;

mqtt_list_init(&msg_handler->list);

msg_handler->qos = qos;
msg_handler->handler = handler; /* register callback handler */
msg_handler->topic_filter = topic_filter;

return msg_handler;
}

4.总结

对应mqtt client 消息之间的通信就是一种subscribe 和publish机制

MQTT通信_初始化_02


  • client需要于server建立连接
  • 建立简介后获创建一个线程处理sever段的消息和定时ping server端的状态
  • client 会向服务器订阅某个topic的消息
  • 订阅的过程中会将对应的topic 消息处理函数加入到message handler 链表中
  • 当服务的有消息过来的时候,会根据消息类型从链表中去除对应的message handler进行处理

这里之分析相关模型和流程,具体的协议后续再更吧。。。。。。。

举报

相关推荐

0 条评论