coding = "utf-8"
import socket
import time
import json
from threading import Thread
class JsonEncoder(json.JSONEncoder):
def default(self, obj):
"""
:param obj:
:return str:
"""
if isinstance(obj, bytes):
return "BYTES" + obj.decode(coding)
return json.JSONEncoder.default(self, obj)
bufsiz = 1024
timeout_time = 1
break_timeout_times = 31
def dict_to_bytes(dict: dict):
return json.dumps(dict, ensure_ascii=False, cls=JsonEncoder, indent=4).encode(coding)
def bytes_to_dict(bytes: bytes):
x = json.loads(bytes.decode(coding))
for key, value in x.items():
if type(value) == str:
if value.startswith("BYTES"):
x[key] = value[5:].encode()
return x
class TcpServer:
def __init__(self, address=("", 80)):
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.address = address
self.socket.bind(self.address)
self.socket.listen(128)
def callback(self, socket, address, data):
print(address, data)
def start_recv(self):
def target(self):
while True:
client_socket, client_address = self.socket.accept()
def target(self, ):
while True:
recv_data = client_socket.recv(bufsiz)
if recv_data:
self.callback(self, client_socket, client_address, recv_data)
else:
client_socket.close()
break
t = Thread(target=target, args=(self,))
t.start()
t = Thread(target=target, args=(self,))
t.start()
class TcpClient:
def __init__(self, socket_time_out=timeout_time):
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.settimeout(socket_time_out)
self.server_address: str = "-1.-1.-1.-1"
def callback(self, data):
print(self.server_address, data)
def connect(self, server_address):
self.server_address = server_address
self.socket.connect(server_address)
def send(self, data):
self.socket.send(data)
def close(self):
self.socket.close()
def start_recv(self):
def target(self):
while True:
try:
recv_data = self.socket.recv(bufsiz)
except:
break
if recv_data:
self.callback(recv_data)
else:
self.server_address = "-1.-1.-1.-1"
self.socket.close()
break
Thread(target=target, args=(self,)).start()
class Udp:
def __init__(self, port=7788, timeout=timeout_time):
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.socket.settimeout(timeout)
self.recving = False
self.socket.bind(("", port))
def sendto(self, data, address):
self.socket.sendto(data, address)
def callback(self, data):
print(data.decode(coding))
def start_recv(self):
if not self.recving:
self.recving = True
def target(self, ):
while True:
if not self.recving:
break
try:
recv_data = self.socket.recv(bufsiz)
except:
continue
self.callback(recv_data)
Thread(target=target, args=(self,)).start()
class MyUdp():
def __init__(self, port=7788, timeout_time=timeout_time):
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.socket.settimeout(timeout_time)
self.socket.bind(("", port))
self.new_send_id = 0
self.recving = False
self.mode = {"send": 0, "received": 1}
self.send_threads = [False] * 16384
def handle_data_dict(self, data_dict, address):
if data_dict['mode'] == 0:
self.callback(data_dict["data"], address)
self.socket.sendto(dict_to_bytes({"mode": 1, "send_id": data_dict["send_id"]}), address)
if data_dict['mode'] == 1:
self.send_threads[data_dict['send_id']] = False
def sendto(self, data, address):
self.start_recv()
send_id = self.new_send_id % len(self.send_threads)
def target(self, data=data, address=address, send_id=send_id, timeout_time=timeout_time,
break_timeout_times=break_timeout_times):
self.send_threads[send_id] = True
send_times = 0
while True:
self.socket.sendto(dict_to_bytes({"mode": 0, "data": data, "send_id": send_id}), address)
send_times += 1
time.sleep(timeout_time)
if self.send_threads[send_id] == False or send_times > break_timeout_times:
break
Thread(target=target, args=(self, data, address, send_id)).start()
def callback(self, data, address):
print(address, data.decode(coding))
def start_recv(self):
if not self.recving:
self.recving = True
def target(self):
while True:
if not self.recving:
break
try:
recv_data, address = self.socket.recvfrom(bufsiz)
except:
continue
recv_data_dict = bytes_to_dict(recv_data)
self.handle_data_dict(recv_data_dict, address)
Thread(target=target, args=(self,)).start()
def end_recv(self):
self.recving = False