0
点赞
收藏
分享

微信扫一扫

Python实现Udp、Tcp通信

眸晓 2022-05-03 阅读 48
# coding=utf-8
coding = "utf-8"
import socket
import time
import json
from threading import Thread


class JsonEncoder(json.JSONEncoder):
    def default(self, obj):
        """
        :param obj:
        :return str:
        """
        if isinstance(obj, bytes):
            return "BYTES" + obj.decode(coding)  # BYTES:bytes类型标识符
        return json.JSONEncoder.default(self, obj)


bufsiz = 1024
timeout_time = 1
break_timeout_times = 31


def dict_to_bytes(dict: dict):
    return json.dumps(dict, ensure_ascii=False, cls=JsonEncoder, indent=4).encode(coding)


def bytes_to_dict(bytes: bytes):
    x = json.loads(bytes.decode(coding))
    for key, value in x.items():
        if type(value) == str:
            if value.startswith("BYTES"):  # 遇到bytes类型标识符
                x[key] = value[5:].encode()
    return x


class TcpServer:
    def __init__(self, address=("", 80)):
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.address = address
        self.socket.bind(self.address)
        self.socket.listen(128)

    def callback(self, socket, address, data):
        print(address, data)

    def start_recv(self):
        def target(self):
            while True:
                client_socket, client_address = self.socket.accept()

                def target(self, ):
                    while True:
                        recv_data = client_socket.recv(bufsiz)  # 接收1024个字节
                        if recv_data:
                            self.callback(self, client_socket, client_address, recv_data)
                        else:
                            client_socket.close()
                            break

                t = Thread(target=target, args=(self,))
                t.start()

        t = Thread(target=target, args=(self,))
        t.start()


class TcpClient:
    def __init__(self, socket_time_out=timeout_time):
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.socket.settimeout(socket_time_out)
        self.server_address: str = "-1.-1.-1.-1"

    def callback(self, data):
        print(self.server_address, data)

    def connect(self, server_address):
        self.server_address = server_address
        self.socket.connect(server_address)

    def send(self, data):
        self.socket.send(data)

    def close(self):
        self.socket.close()

    def start_recv(self):
        def target(self):
            while True:
                try:
                    recv_data = self.socket.recv(bufsiz)  # 接收1024个字节
                except:
                    break
                if recv_data:
                    self.callback(recv_data)
                else:
                    self.server_address = "-1.-1.-1.-1"
                    self.socket.close()
                    break

        Thread(target=target, args=(self,)).start()


class Udp:
    def __init__(self, port=7788, timeout=timeout_time):
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self.socket.settimeout(timeout)
        self.recving = False
        self.socket.bind(("", port))

    def sendto(self, data, address):
        self.socket.sendto(data, address)

    def callback(self, data):
        print(data.decode(coding))

    def start_recv(self):
        if not self.recving:
            self.recving = True

            def target(self, ):
                while True:
                    if not self.recving:
                        break
                    try:
                        recv_data = self.socket.recv(bufsiz)
                    except:
                        continue
                    self.callback(recv_data)

            Thread(target=target, args=(self,)).start()

#使用此Udp可以传输字典
class MyUdp():
    def __init__(self, port=7788, timeout_time=timeout_time):
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self.socket.settimeout(timeout_time)
        self.socket.bind(("", port))
        self.new_send_id = 0
        self.recving = False
        self.mode = {"send": 0, "received": 1}
        self.send_threads = [False] * 16384

    def handle_data_dict(self, data_dict, address):
        if data_dict['mode'] == 0:
            self.callback(data_dict["data"], address)
            self.socket.sendto(dict_to_bytes({"mode": 1, "send_id": data_dict["send_id"]}), address)
        if data_dict['mode'] == 1:
            self.send_threads[data_dict['send_id']] = False

    def sendto(self, data, address):
        self.start_recv()
        send_id = self.new_send_id % len(self.send_threads)

        def target(self, data=data, address=address, send_id=send_id, timeout_time=timeout_time,
                   break_timeout_times=break_timeout_times):
            self.send_threads[send_id] = True
            send_times = 0
            while True:
                self.socket.sendto(dict_to_bytes({"mode": 0, "data": data, "send_id": send_id}), address)
                send_times += 1
                time.sleep(timeout_time)
                if self.send_threads[send_id] == False or send_times > break_timeout_times:
                    break

        Thread(target=target, args=(self, data, address, send_id)).start()

    def callback(self, data, address):
        print(address, data.decode(coding))

    def start_recv(self):
        if not self.recving:
            self.recving = True

            def target(self):
                while True:
                    if not self.recving:
                        break
                    try:
                        recv_data, address = self.socket.recvfrom(bufsiz)
                    except:
                        continue
                    recv_data_dict = bytes_to_dict(recv_data)
                    self.handle_data_dict(recv_data_dict, address)

            Thread(target=target, args=(self,)).start()

    def end_recv(self):
        self.recving = False
举报

相关推荐

0 条评论