0
点赞
收藏
分享

微信扫一扫

mqtt安装使用教程。(基于rabbitmq插件,docker部署,k8s部署,python教程)


​​


​​

全栈工程师开发手册 (作者:栾鹏)
​​ 架构系列文章​​

Docker安装RabbitMQ配置MQTT

使用RabbitMQ作为MQTT服务端,Eclipse Paho作为客户端。宿主机系统为ubuntu16.04

Docker下载镜像

docker pull daocloud.io/library/rabbitmq:3.7.4

启动RabbitMQ

docker run -d --hostname my-rabbit --name some-rabbit -p 15672:15672 -p 5672:5672 -p 1883:1883 -p 15675:15675 daocloud.io/library/rabbitmq:3.7.4

注意映射容器端口

15672 是rabbitmq management管理界面默认访问端口
5672 是amqp默认端口
1883 是mqtt tcp协议默认端口
15675 是web_mqtt websocket协议默认端口

启用插件

默认安装后我们需要手动开启rabbitmq_management插件,rabbitmq_mqtt插件和rabbitmq_web_mqtt插件。

执行如下三条命令

docker exec <容器ID> rabbitmq-plugins enable rabbitmq_management
docker exec <容器ID> rabbitmq-plugins enable rabbitmq_mqtt
docker exec <容器ID> rabbitmq-plugins enable rabbitmq_web_mqtt

当然你也可以写个脚本start.sh,复制到容器中

/usr/sbin/rabbitmq-plugins enable rabbitmq_management

/usr/sbin/rabbitmq-plugins enable rabbitmq_mqtt

/usr/sbin/rabbitmq-plugins enable rabbitmq_web_mqtt

进入容器执行这个脚本。

sh start.sh

开放宿主机端口

firewall-cmd --zone=public --add-port=15672/tcp --permanent
firewall-cmd --zone=public --add-port=5672/tcp --permanent
firewall-cmd --zone=public --add-port=1883/tcp --permanent
firewall-cmd --zone=public --add-port=15675/tcp --permanent
firewall-cmd --reload

Python MQTT客户端实现

安装python包

pip install paho-mqtt

消费者demo

import sys
import os

import paho.mqtt.client as mqtt
import time


HOST="192.168.2.177"
PORT = 59085



def client_loop():
client_id = time.strftime('%Y%m%d%H%M%S',time.localtime(time.time()))
client = mqtt.Client(client_id, userdata=None, protocol=4) # ClientId不能重复,所以使用当前时间 4就是MQTT3.1.1
client.username_pw_set("guest", "guest") # 必须设置,否则会返回「Connected with result code 4」
client.on_connect = on_connect
client.on_message = on_message
client.on_subscribe = on_subscribe
client.connect(HOST, PORT, keepalive=60)
print('mqtt connect success')
client.loop_forever(timeout=1.0)


# 开始时订阅 callback
def on_subscribe(self, client, userdata, mid, granted_qos):
print("Begin subscribe topic with ", mid)



# 链接成功的函数
def on_connect(client, userdata, flags, rc):
print("Connected with result code "+str(rc))
client.subscribe("test_topic") # 设置topic 0/#



import json
# 接收到消息的函数
def on_message(client, userdata, msg):
# print(msg)
# print(client,userdata,msg)
print(msg.topic+" "+msg.payload.decode("utf-8"))
content = json.loads(msg.payload.decode("utf-8"))
# print(content)

if __name__ == '__main__':
client_loop()

生产者demo

import paho.mqtt.client as mqtt
import paho.mqtt.publish as publish
import time


HOST = "192.168.2.177"
PORT = 59085


# rc 是操作结果的状态码,0 代表成功
# 断开连接时的 callback
def on_disconnect(self, client, userdata, rc):
print(self.get_time(), " end a loop with code " + str(rc))

# 链接成功
def on_connect(client, userdata, flags, rc):
print("Connected with result code "+str(rc))
client.subscribe("test")

# 接收到消息
def on_message(client, userdata, msg):
print(msg.topic+" "+msg.payload.decode("utf-8"))

if __name__ == '__main__':
for i in range(11):
client_id = time.strftime('%Y%m%d%H%M%S',time.localtime(time.time()))
client = mqtt.Client(client_id, userdata=None, protocol=4) # ClientId不能重复,所以使用当前时间,# 4就是MQTT3.1.1
client.username_pw_set("guest", "guest") # 必须设置,否则会返回「Connected with result code 4」
client.on_connect = on_connect
client.on_disconnect = on_disconnect
client.on_message = on_message
client.connect(HOST, PORT, keepalive=60)
pub_result=client.publish("test_topic", payload="你好 MQTT", qos=0, retain=False) # 发布消息
if pub_result.is_published:
print(" success pub message with id: ", pub_result.mid)
else:
print("failed to pub message")
time.sleep(1)

# for i in range(10):
# publish.single("test", "你好 MQTT", qos = 1,hostname=HOST,port=PORT, client_id=client_id,auth = {'username':"guest", 'password':"guest"})

k8s部署rabbit,启动manager,mqtt,websocket

我已经上传到github上了,大家可以下载安装https://github.com/626626cdllp/k8s

# 集群中的每个rabbitmq都要是相同的cookie
echo $(openssl rand -base64 32) > erlang.cookie
kubectl -n cloudai-2 create secret generic erlang.cookie --from-file=erlang.cookie
kubectl create -f mqtt-serviceAccount.yaml
kubectl create -f mqtt-service.yaml
kubectl create -f mqtt-deployment.yaml


举报

相关推荐

0 条评论