0
点赞
收藏
分享

微信扫一扫

内容安全补充

以沫的窝 02-24 16:00 阅读 4

思路:

1.websockets需要从客户端接收消息,由于websockets创建服务端只能绑定一个端口,所以需要单独占用一个线程。收到的消息,我们需要共享给主线程,然后主线程根据设备(多线程)分发消息给各线程

2.消息中心需要独立出来,websockets服务端放消息,主线程去消息

3.根据思路设计模块:

                        1.消息库

                        2.服务端

                        3.主线程

                        4.多线程

先运行Main.py,再运行websocket_client.py(客户端),客户端发送的消息可能不一样,所以统一消息里面必须有device_id 或者device_name

修改websocket_client.py中data的信息,发送不同消息

一、消息处理中心 message_base.py

根据设备,创建储存设备消息,提取设备消息的功能

from queue import Queue


class MessageBase:
    def __init__(self):
        self.data = dict()

    def add(self, device, data):
        if device in self.data:
            self.data[device].put(data)
        else:
            self.data[device] = Queue()
            self.data[device].put(data)

    def get(self, device):
        data_queue: Queue = self.data.get(device)
        if not data_queue or data_queue.empty():
            return None
        data = data_queue.get()
        return data


if __name__ == '__main__':
    mb = MessageBase()
    mb.add("a", "asdasd")
    mb.add("a", "11111111")
    print(mb.data)
    data = mb.get("a")
    print(data)

二、服务端:websocket_server.py

从客户端接收消息,并存到消息中心

import asyncio
import json
import threading
import websockets
##
from message_base import MessageBase


class WebServer:
    def __init__(self, host, port, message_base: MessageBase):
        self.host = host
        self.port = port
        self.clients = []
        self.message_base = message_base

    async def echo(self, websocket, path):
        self.clients.append(websocket)
        client_ip, client_port = websocket.remote_address
        while True:
            try:
                recv_text = await websocket.recv()
                data = json.loads(recv_text)
                device = data.get("device")
                if device:
                    self.message_base.add(device, data)
                else:
                    continue
            except websockets.ConnectionClosed:
                print("ConnectionClosed...")  # 链接断开
                self.clients.remove(websocket)
                break
            except websockets.InvalidState:
                print("InvalidState...")  # 无效状态
                self.clients.remove(websocket)
                break
            except Exception as e:
                print(e)

    def connect(self):
        print("连接成功!")
        asyncio.set_event_loop(asyncio.new_event_loop())
        start_server = websockets.serve(self.echo, self.host, self.port)
        asyncio.get_event_loop().run_until_complete(start_server)
        asyncio.get_event_loop().run_forever()

    def run(self):
        t = threading.Thread(target=self.connect)
        t.start()
        print("已启动!")


if __name__ == '__main__':
    mb = MessageBase()
    ws = WebServer("192.168.6.28", 8001, mb)
    ws.run()

三、设备功能 device_function.py

每个设备对应的线程功能,可以统一也可以写多个功能

class DeviceFunc:
    def __init__(self, device_name, data):
        self.device_name = device_name
        self.data = data

    def show_data(self):
        if self.data:
            print(self.device_name, "收到消息:", self.data.get("value"))

四、主线程 main.py

初始化所有功能模块,并运行主线程

from message_base import MessageBase
from websocket_server import WebServer
from device_function import DeviceFunc


class MainThread:
    def __init__(self, message_base: MessageBase, websocket_server: WebServer, device_list):
        self.message_base = message_base
        self.websocket_server = websocket_server
        self.device_list = device_list

    def run_server(self):
        self.websocket_server.run()

    def run(self):
        self.run_server()
        while True:
            for device in self.device_list:
                try:
                    # 开始根据设备即功能处理消息
                    data = self.message_base.get(device)
                    if not data:
                        continue
                    df = DeviceFunc(device, data)
                    df.show_data()
                except Exception as err:
                    pass


if __name__ == '__main__':
    mb = MessageBase()
    ws = WebServer("192.168.6.28", 8000, mb)
    device_list = ["aa", "bb", "cc"]

    mt = MainThread(mb, ws, device_list)
    mt.run()

五、客户端

给服务端发送消息,测试用

import json

import websocket


class WebClient:
    def __init__(self, host, port):
        self.host = host
        self.port = port
        self.conn = None
        self.flag = False

    def connect(self):
        try:
            url = f"ws://{self.host}:{self.port}"
            self.conn = websocket.create_connection(url)
            self.flag = True
            print("连接成功")
        except Exception as err:
            self.flag = False
            print("连接失败", err)

    def close(self):
        self.conn.close()

    def recv(self):
        data = self.conn.recv(1024)
        print(data)

    def send(self, data):
        self.conn.send(data)
        print("发送成功")


if __name__ == '__main__':
    host = "192.168.6.28"
    port = 8000
    ws = WebClient(host, port)
    if not ws.flag:
        ws.connect()
    data = {"device": "bb", "value": "123"}
    data = {"device": "cc", "value": "123"}
    data = json.dumps(data)
    ws.send(data)
举报

相关推荐

0 条评论