1.mqtt 模型
mqtt 是物联网的一种通信方式,主要组成部分由服务器,订阅者,发布者
订阅者向服务器subscribe某条消息,发布者在有消息更新事件的情况将消息publish给相关订阅者
整个过程中主要有两个
- 主题(topic)
- 消息message 或者payload
2.主要的流程
- 订阅者connect到server
- 订阅者subscribe某个topic
- 发布者connect 到server
- 发布者将某个topic的消息publish给server
- server 又将这个topic的消息publish个相关订阅者
整个过程中发布者和订阅者会每个段时间想server确认状态,看对方是否还活着
3.mqtt-源码分析
(参考大牛的源码-https://github.com/jiejieTop/mqttclient)
3.1 主函数main
- 创建mqtt 对象
- 设置相关属性 ip port 等
- 连接服务器
- 创建处理publish时间的线程
int main(void)
{
int res;
pthread_t thread1;
mqtt_client_t *client = NULL;
char client_id[32];
char user_name[32];
char password[32];
printf("\nwelcome to mqttclient test...\n");
random_string(client_id, 10);
random_string(user_name, 10);
random_string(password, 10);
mqtt_log_init();
// 申请mqtt 对象
client = mqtt_lease();
......
// 设置IP PORT
mqtt_set_host(client, "120.25.213.14");
mqtt_set_client_id(client, client_id);
mqtt_set_user_name(client, user_name);
mqtt_set_password(client, password);
mqtt_set_clean_session(client, 1);
// 连接服务器
mqtt_connect(client);
// 订阅相关topic消息
mqtt_subscribe(client, "topic1", QOS0, topic1_handler);
mqtt_subscribe(client, "topic2", QOS1, NULL);
mqtt_subscribe(client, "topic3", QOS2, NULL);
// 创建publish 线程处理pubish相关消息
res = pthread_create(&thread1, NULL, mqtt_publish_thread, client);
3.2连接服务器相关处理逻辑
- 初始化网络
- 设置connect_data参数
- 连接服务器
- 创建一个线程用来订阅
static int mqtt_connect_with_results(mqtt_client_t* c)
{
int len = 0;
int rc = MQTT_CONNECT_FAILED_ERROR;
....
#ifndef MQTT_NETWORK_TYPE_NO_TLS
rc = network_init(c->mqtt_network, c->mqtt_host, c->mqtt_port, c->mqtt_ca);
#else
// 初始化网络
rc = network_init(c->mqtt_network, c->mqtt_host, c->mqtt_port, NULL);
#endif
// 连接网络
rc = network_connect(c->mqtt_network);
if (MQTT_SUCCESS_ERROR != rc) {
if (NULL != c->mqtt_network) {
network_release(c->mqtt_network);
RETURN_ERROR(rc);
}
}
// 初始化connect_data
connect_data.keepAliveInterval = c->mqtt_keep_alive_interval;
connect_data.cleansession = c->mqtt_clean_session;
connect_data.MQTTVersion = c->mqtt_version;
connect_data.clientID.cstring= c->mqtt_client_id;
connect_data.username.cstring = c->mqtt_user_name;
connect_data.password.cstring = c->mqtt_password;
if (c->mqtt_will_flag) {
connect_data.willFlag = c->mqtt_will_flag;
connect_data.will.message.cstring = c->mqtt_will_options->will_message;
connect_data.will.qos = c->mqtt_will_options->will_qos;
connect_data.will.retained = c->mqtt_will_options->will_retained;
connect_data.will.topicName.cstring = c->mqtt_will_options->will_topic;
}
// 获取timer
platform_timer_cutdown(&c->mqtt_last_received, (c->mqtt_keep_alive_interval * 1000));
platform_mutex_lock(&c->mqtt_write_lock);
/* serialize connect packet */
if ((len = MQTTSerialize_connect(c->mqtt_write_buf, c->mqtt_write_buf_size, &connect_data)) <= 0)
goto exit;
platform_timer_cutdown(&connect_timer, c->mqtt_cmd_timeout);
/* send connect packet */
if ((rc = mqtt_send_packet(c, len, &connect_timer)) != MQTT_SUCCESS_ERROR)
goto exit;
// 等等server 回复ack
if (mqtt_wait_packet(c, CONNACK, &connect_timer) == CONNACK) {
if (MQTTDeserialize_connack(&connack_data.session_present, &connack_data.rc, c->mqtt_read_buf, c->mqtt_read_buf_size) == 1)
rc = connack_data.rc;
else
rc = MQTT_CONNECT_FAILED_ERROR;
} else
rc = MQTT_CONNECT_FAILED_ERROR;
exit:
if (rc == MQTT_SUCCESS_ERROR) {
if(NULL == c->mqtt_thread) {
/* connect success, and need init mqtt thread */
c->mqtt_thread= platform_thread_init("mqtt_yield_thread", mqtt_yield_thread, c, MQTT_THREAD_STACK_SIZE, MQTT_THREAD_PRIO, MQTT_THREAD_TICK);
if (NULL != c->mqtt_thread) {
mqtt_set_client_state(c, CLIENT_STATE_CONNECTED);
platform_thread_startup(c->mqtt_thread);
platform_thread_start(c->mqtt_thread); /* start run mqtt thread */
} else {
/*creat the thread fail and disconnect the mqtt socket connect*/
network_release(c->mqtt_network);
rc = MQTT_CONNECT_FAILED_ERROR;
MQTT_LOG_W("%s:%d %s()... mqtt yield thread creat failed...", __FILE__, __LINE__, __FUNCTION__);
}
} else {
mqtt_set_client_state(c, CLIENT_STATE_CONNECTED); /* reconnect, mqtt thread is already exists */
}
c->mqtt_ping_outstanding = 0; /* reset ping outstanding */
} else {
network_release(c->mqtt_network);
mqtt_set_client_state(c, CLIENT_STATE_INITIALIZED); /* connect failed */
}
platform_mutex_unlock(&c->mqtt_write_lock);
RETURN_ERROR(rc);
}
3.3 连接时创建的线程分析
static int mqtt_connect_with_results(mqtt_client_t* c)
{
......
#ifndef MQTT_NETWORK_TYPE_NO_TLS
rc = network_init(c->mqtt_network, c->mqtt_host, c->mqtt_port, c->mqtt_ca);
#else
rc = network_init(c->mqtt_network, c->mqtt_host, c->mqtt_port, NULL);
#endif
rc = network_connect(c->mqtt_network);
.......
/* connect success, and need init mqtt thread */
c->mqtt_thread= platform_thread_init("mqtt_yield_thread", mqtt_yield_thread, c, MQTT_THREAD_STACK_SIZE, MQTT_THREAD_PRIO, MQTT_THREAD_TICK);
RETURN_ERROR(rc);
}
client 在连接服务器的时候会在内部创建一个线程mqtt_yield_thread
static void mqtt_yield_thread(void *arg)
{
int rc;
client_state_t state;
mqtt_client_t *c = (mqtt_client_t *)arg;
platform_thread_t *thread_to_be_destoried = NULL;
// 获取clinet 状态
state = mqtt_get_client_state(c);
if (CLIENT_STATE_CONNECTED != state) {
MQTT_LOG_W("%s:%d %s()..., mqtt is not connected to the server...", __FILE__, __LINE__, __FUNCTION__);
platform_thread_stop(c->mqtt_thread); /* mqtt is not connected to the server, stop thread */
}
while (1) {
// 循环处理消息,mqtt_yield里面进行处理
rc = mqtt_yield(c, c->mqtt_cmd_timeout);
if (MQTT_CLEAN_SESSION_ERROR == rc) {
MQTT_LOG_W("%s:%d %s()..., mqtt clean session....", __FILE__, __LINE__, __FUNCTION__);
network_disconnect(c->mqtt_network);
mqtt_clean_session(c);
goto exit;
} else if (MQTT_RECONNECT_TIMEOUT_ERROR == rc) {
MQTT_LOG_W("%s:%d %s()..., mqtt reconnect timeout....", __FILE__, __LINE__, __FUNCTION__);
}
}
exit:
thread_to_be_destoried = c->mqtt_thread;
c->mqtt_thread = (platform_thread_t *)0;
platform_thread_destroy(thread_to_be_destoried);
}
static int mqtt_yield(mqtt_client_t* c, int timeout_ms)
{
int rc = MQTT_SUCCESS_ERROR;
client_state_t state;
platform_timer_t timer;
if (NULL == c)
RETURN_ERROR(MQTT_FAILED_ERROR);
if (0 == timeout_ms)
timeout_ms = c->mqtt_cmd_timeout;
platform_timer_init(&timer);
platform_timer_cutdown(&timer, timeout_ms);
while (!platform_timer_is_expired(&timer)) {
state = mqtt_get_client_state(c);
if (CLIENT_STATE_CLEAN_SESSION == state) {
RETURN_ERROR(MQTT_CLEAN_SESSION_ERROR);
} else if (CLIENT_STATE_CONNECTED != state) {
/* mqtt not connect, need reconnect */
rc = mqtt_try_reconnect(c);
if (MQTT_RECONNECT_TIMEOUT_ERROR == rc)
RETURN_ERROR(rc);
continue;
}
/* mqtt connected, handle mqtt packet */
rc = mqtt_packet_handle(c, &timer);
if (rc >= 0) {
/* scan ack list, destroy ack handler that have timed out or resend them */
mqtt_ack_list_scan(c, 1);
} else if (MQTT_NOT_CONNECT_ERROR == rc) {
MQTT_LOG_E("%s:%d %s()... mqtt not connect", __FILE__, __LINE__, __FUNCTION__);
} else {
break;
}
}
RETURN_ERROR(rc);
}
在mqtt_yield里面会循环处理server 的packet数据,并且会每隔一段时间ping 保存双方都alive
static int mqtt_packet_handle(mqtt_client_t* c, platform_timer_t* timer)
{
int rc = MQTT_SUCCESS_ERROR;
int packet_type = 0;
rc = mqtt_read_packet(c, &packet_type, timer);
switch (packet_type) {
case 0: /* timed out reading packet or an error occurred while reading data*/
if (MQTT_BUFFER_TOO_SHORT_ERROR == rc) {
MQTT_LOG_E("the client read buffer is too short, please call mqtt_set_read_buf_size() to reset the buffer size");
/* don't return directly, you need to stay active, because there is data readable now, but the buffer is too small */
}
break;
case CONNACK: /* has been processed */
goto exit;
case PUBACK:
case PUBCOMP:
rc = mqtt_puback_and_pubcomp_packet_handle(c, timer);
break;
case SUBACK:
rc = mqtt_suback_packet_handle(c, timer);
break;
case UNSUBACK:
rc = mqtt_unsuback_packet_handle(c, timer);
break;
case PUBLISH:
rc = mqtt_publish_packet_handle(c, timer);
break;
case PUBREC:
case PUBREL:
rc = mqtt_pubrec_and_pubrel_packet_handle(c, timer);
break;
case PINGRESP:
c->mqtt_ping_outstanding = 0; /* keep alive ping success */
break;
default:
goto exit;
}
rc = mqtt_keep_alive(c);
exit:
if (rc == MQTT_SUCCESS_ERROR)
rc = packet_type;
RETURN_ERROR(rc);
}
int mqtt_keep_alive(mqtt_client_t* c)
{
int rc = MQTT_SUCCESS_ERROR;
rc = mqtt_is_connected(c);
if (MQTT_SUCCESS_ERROR != rc)
RETURN_ERROR(rc);
if (platform_timer_is_expired(&c->mqtt_last_sent) || platform_timer_is_expired(&c->mqtt_last_received)) {
if (c->mqtt_ping_outstanding) {
MQTT_LOG_W("%s:%d %s()... ping outstanding", __FILE__, __LINE__, __FUNCTION__);
/*must realse the socket file descriptor zhaoshimin 20200629*/
network_release(c->mqtt_network);
mqtt_set_client_state(c, CLIENT_STATE_DISCONNECTED);
rc = MQTT_NOT_CONNECT_ERROR; /* PINGRESP not received in keepalive interval */
} else {
platform_timer_t timer;
int len = MQTTSerialize_pingreq(c->mqtt_write_buf, c->mqtt_write_buf_size);
if (len > 0)
rc = mqtt_send_packet(c, len, &timer); // 100ask, send the ping packet
c->mqtt_ping_outstanding++;
}
}
RETURN_ERROR(rc);
}
3.4 client 订阅
订阅的时候一般会提供主题topic 和对应处理主题消息的mesaage handler函数
mqtt_subscribe(client, "topic1", QOS0, topic1_handler);
int mqtt_subscribe(mqtt_client_t* c, const char* topic_filter, mqtt_qos_t qos, message_handler_t handler)
{
int rc = MQTT_SUBSCRIBE_ERROR;
int len = 0;
uint16_t packet_id;
platform_timer_t timer;
MQTTString topic = MQTTString_initializer;
topic.cstring = (char *)topic_filter;
message_handlers_t *msg_handler = NULL;
if (CLIENT_STATE_CONNECTED != mqtt_get_client_state(c))
RETURN_ERROR(MQTT_NOT_CONNECT_ERROR);
platform_mutex_lock(&c->mqtt_write_lock);
packet_id = mqtt_get_next_packet_id(c);
/* serialize subscribe packet and send it */
len = MQTTSerialize_subscribe(c->mqtt_write_buf, c->mqtt_write_buf_size, 0, packet_id, 1, &topic, (int*)&qos);
if (len <= 0)
goto exit;
if ((rc = mqtt_send_packet(c, len, &timer)) != MQTT_SUCCESS_ERROR)
goto exit;
if (NULL == handler)
handler = default_msg_handler; /* if handler is not specified, the default handler is used */
/* create a message and record it */
msg_handler = mqtt_msg_handler_create(topic_filter, qos, handler);
if (NULL == msg_handler) {
rc = MQTT_MEM_NOT_ENOUGH_ERROR;
goto exit;
}
rc = mqtt_ack_list_record(c, SUBACK, packet_id, len, msg_handler);
exit:
platform_mutex_unlock(&c->mqtt_write_lock);
RETURN_ERROR(rc);
}
static message_handlers_t *mqtt_msg_handler_create(const char* topic_filter, mqtt_qos_t qos, message_handler_t handler)
{
message_handlers_t *msg_handler = NULL;
msg_handler = (message_handlers_t *) platform_memory_alloc(sizeof(message_handlers_t));
if (NULL == msg_handler)
return NULL;
mqtt_list_init(&msg_handler->list);
msg_handler->qos = qos;
msg_handler->handler = handler; /* register callback handler */
msg_handler->topic_filter = topic_filter;
return msg_handler;
}
4.总结
对应mqtt client 消息之间的通信就是一种subscribe 和publish机制
- client需要于server建立连接
- 建立简介后获创建一个线程处理sever段的消息和定时ping server端的状态
- client 会向服务器订阅某个topic的消息
- 订阅的过程中会将对应的topic 消息处理函数加入到message handler 链表中
- 当服务的有消息过来的时候,会根据消息类型从链表中去除对应的message handler进行处理
这里之分析相关模型和流程,具体的协议后续再更吧。。。。。。。