文章目录
前言
数据库帮助类
信息设置函数
MYSQL_HOST = 'localhost'
MYSQL_PORT = 3306
MYSQL_USER = 'root'
MYSQL_PASSWORD = ''
MYSQL_DATABASE = ''
RESTART_INIT = True
mysql = ...
cursor = ...
def setting(host='localhost', port=3306, user='root', password='password', database='database', init=True):
"""
设置全局变量
:param host: 数据库主机名(地址)
:param port: 数据库端口
:param user: 数据库用户
:param password: 数据库用户密码
:param database: 数据库名
:param init: 每次启动程序是否初始化数据库
:return: NoReturn
"""
global MYSQL_HOST, MYSQL_PORT, MYSQL_USER, MYSQL_PASSWORD, MYSQL_DATABASE, RESTART_INIT, mysql, cursor
MYSQL_HOST = host
MYSQL_PORT = port
MYSQL_USER = user
MYSQL_PASSWORD = password
MYSQL_DATABASE = database
RESTART_INIT = init
try:
mysql = pymysql.connect(
host=MYSQL_HOST,
port=MYSQL_PORT,
user=MYSQL_USER,
password=MYSQL_PASSWORD,
charset='utf8mb4',
cursorclass=pymysql.cursors.SSDictCursor
)
cursor = mysql.cursor()
if RESTART_INIT:
cursor.execute(f'DROP DATABASE IF EXISTS `{MYSQL_DATABASE}`;')
cursor.execute(f'CREATE DATABASE IF NOT EXISTS `{MYSQL_DATABASE}`;')
mysql.commit()
cursor.close()
mysql.close()
mysql = pymysql.connect(
host=MYSQL_HOST,
port=MYSQL_PORT,
user=MYSQL_USER,
password=MYSQL_PASSWORD,
database=MYSQL_DATABASE,
charset='utf8mb4',
cursorclass=pymysql.cursors.SSDictCursor
)
cursor = mysql.cursor()
except pymysql.err.OperationalError:
print('用户名和密码不匹配!')
logging.exception(f'用户名 {MYSQL_USER} 和密码 {MYSQL_PASSWORD} 不匹配!')
time.sleep(.01)
exit('虽然程序异常退出了,但不要慌,一切尽在我的掌握之中!')
except pymysql.Error as e:
print(f'exception: {e.args}')
logging.exception(e.args)
exit('虽然程序异常退出了,但不要慌,一切尽在我的掌握之中!')
sql查询函数
def exec_sql(sql):
if mysql is ... or cursor is ...:
print('请先初始化设置 setting 函数!')
logging.error('请先初始化设置 setting 函数!')
exit('虽然程序异常退出了,但不要慌,一切尽在我的掌握之中!')
mysql.ping()
cursor.execute(sql)
res = cursor.fetchall()
if res:
return list(res)
return []
sql执行函数
def commit_sql(sql):
if mysql is ... or cursor is ...:
print('请先初始化设置 setting 函数!')
logging.error('请先初始化设置 setting 函数!')
exit('虽然程序异常退出了,但不要慌,一切尽在我的掌握之中!')
try:
mysql.ping()
cursor.execute(sql)
mysql.commit()
return True
except pymysql.Error as e:
print(f'exception: {e.args}')
logging.exception(e.args)
mysql.rollback()
return False
整型字段
class Int:
def __init__(self, *, max_length=11, null=False, primary=False, only=False, auto=False, default=''):
sql = 'int'
if max_length:
sql += f'({max_length})'
if null:
sql += ' NOT NULL'
if primary:
sql += ' PRIMARY KEY'
if only:
sql += ' UNIQUE'
if auto:
sql += ' AUTO_INCREMENT'
if default:
sql += f' DEFAULT {default}'
self.sql = sql
浮点型字段
class Float:
def __init__(self, *, max_length=0, null=False, primary=False, only=False, default=''):
sql = 'float'
if max_length:
sql += f'({max_length})'
if null:
sql += ' NOT NULL'
if primary:
sql += ' PRIMARY KEY'
if only:
sql += ' UNIQUE'
if default:
sql += f' DEFAULT {default}'
self.sql = sql
字符串型字段
class Varchar:
def __init__(self, *, max_length=255, null=False, primary=False, only=False, default=''):
sql = 'varchar'
if max_length:
sql += f'({max_length})'
sql += ' CHARACTER SET utf8mb4'
if null:
sql += ' NOT NULL'
if primary:
sql += ' PRIMARY KEY'
if only:
sql += ' UNIQUE'
if default:
sql += f' DEFAULT {default}'
self.sql = sql
文本型字段
class Text:
def __init__(self, *, max_length=0, null=False, primary=False, only=False, default=''):
sql = 'text'
if max_length:
sql += f'({max_length})'
sql += ' CHARACTER SET utf8mb4'
if null:
sql += ' NOT NULL'
if primary:
sql += ' PRIMARY KEY'
if only:
sql += ' UNIQUE'
if default:
sql += f' DEFAULT {default}'
self.sql = sql
数据表类
class Table:
def __init__(self, table, **kwargs):
sql = f'CREATE TABLE IF NOT EXISTS `{table}` (' + ', '.join([f'`{key}` {kwargs[key].sql}' for key in kwargs]) + ') DEFAULT CHARSET=utf8mb4;'
self.fields = kwargs
self.table = table
self.sql = sql
self.create()
def field(self):
return list(self.fields.keys())
def record(self):
res = self.select(fields='COUNT(*)')
if res:
return res[0]['COUNT(*)']
return 0
def create(self):
if commit_sql(self.sql):
return True
else:
return False
def select(self, *, fields='*', where=None):
if isinstance(fields, list):
fields = ', '.join(fields)
sql = f'SELECT {fields} FROM `{self.table}`'
if where:
sql += ' WHERE ' + ' AND '.join([f'`{key}` = "{where[key]}"' for key in where])
sql += ';'
return exec_sql(sql)
def insert(self, *, fields=False, values):
sql = f'INSERT INTO `{self.table}` '
if fields:
sql += '(' + ', '.join([f'`{key}`' for key in values.keys()]) + ') VALUES (' + ', '.join([f'"{value}"' for value in values.values()]) + ');'
elif isinstance(values, dict):
sql += 'VALUES (' + ', '.join([f'"{value}"' for value in values.values()]) + ');'
elif isinstance(values, list):
sql += 'VALUES (' + ', '.join([f'"{value}"' for value in values]) + ');'
else:
sql += 'VALUES (' + str(values) + ');'
if commit_sql(sql):
return True
else:
return False
def update(self, *, where, values):
sql = f'UPDATE `{self.table}` SET ' + ', '.join([f'`{key}` = "{values[key]}"' for key in values]) + ' WHERE ' + ' AND '.join([f'`{key}` = "{where[key]}"' for key in where]) + ';'
if commit_sql(sql):
return True
else:
return False
def delete(self, *, where):
sql = f'DELETE FROM `{self.table}` WHERE ' + ' AND '.join([f'`{key}` = "{where[key]}"' for key in where]) + ';'
if commit_sql(sql):
return True
else:
return False
实体生成类
property函数生成函数
def make_property_function(variable_list, method_list, *, indentation=1, condition=True, private='__'):
"""
生成 property 函数( \t 不被 pycharm 所兼容,最后需要将 \t 转换成 4 个空格)
:param variable_list: 字符串类型描述
:param method_list: property 方法列表 ['getter', 'setter', 'deleter'] (设置 property 函数,property 本体会默认生成)
:param indentation: 函数缩进等级,指装饰器和函数名所在行之前的缩进(每级一个 \t 长度)
:param condition: 是否加入简易类型判断(当 variable_list 为列表时无法使用此功能)
:param private: 私有等级 ['_', '__']
:return: 生成的 property 函数字符串
"""
if isinstance(variable_list, list):
condition = False
string = ''
for v in variable_list:
string += '\n\n' + '\t' * indentation + '@property'
string += '\n' + '\t' * indentation + f'def {v}(self):'
string += '\n' + '\t' * indentation + f'\treturn self.{private}{v}'
builtins = ''
if v in dir(__builtins__):
builtins = '_'
if 'setter' in method_list:
string += '\n\n' + '\t' * indentation + f'@{v}.setter'
string += '\n' + '\t' * indentation + f'def {v}(self, {builtins}{v}):'
if condition:
string += '\n' + '\t' * indentation + f'\tif not isinstance({builtins}{v}, {variable_list[v]}):'
string += '\n' + '\t' * indentation + f'\t\traise ValueError("variable `{v}` must be type of `{variable_list[v]}`")'
string += '\n' + '\t' * indentation + f'\tself.{private}{v} = {builtins}{v}'
if 'getter' in method_list:
string += '\n\n' + '\t' * indentation + f'@{v}.getter'
string += '\n' + '\t' * indentation + f'def {v}(self):'
string += '\n' + '\t' * indentation + f'\treturn self.{private}{v}'
if 'deleter' in method_list:
string += '\n\n' + '\t' * indentation + f'@{v}.deleter'
string += '\n' + '\t' * indentation + f'def {v}(self):'
string += '\n' + '\t' * indentation + f'\tdel self.{private}{v}'
return string.replace('\t', ' ')
property函数生成函数存根生成函数
def make_property_stub(variable_list, method_list, *, indentation=1):
"""
生成 property 函数的存根字符串
:param variable_list: 字符串类型描述
:param method_list: property 方法列表 ['getter', 'setter', 'deleter'] (设置 property 函数,property 本体会默认生成)
:param indentation: 函数缩进等级,指装饰器和函数名所在行之前的缩进(每级一个 \t 长度)
:return:
"""
string = ''
for v in variable_list:
string += '\n' + '\t' * indentation + '@property'
string += '\n' + '\t' * indentation + f'def {v}(self) -> {variable_list[v]}: ...'
builtins = ''
if v in dir(__builtins__):
builtins = '_'
if 'setter' in method_list:
string += '\n' + '\t' * indentation + f'@{v}.setter'
string += '\n' + '\t' * indentation + f'def {v}(self, {builtins}{v}: {variable_list[v]}) -> NoReturn: ...'
if 'getter' in method_list:
string += '\n' + '\t' * indentation + f'@{v}.getter'
string += '\n' + '\t' * indentation + f'def {v}(self) -> {variable_list[v]}: ...'
if 'deleter' in method_list:
string += '\n' + '\t' * indentation + f'@{v}.deleter'
string += '\n' + '\t' * indentation + f'def {v}(self) -> NoReturn: ...'
return string.replace('\t', ' ')
生成python代码文件和python存根文件函数
NEWLINE = '\n' # 换行标志
def make_python_file(file_name, data_type, data_necessary, data_unnecessary, *, project_name='',
mysql_tools_file='MySQLController', create_file=True, property_use=True, method_list=None):
"""
生成功能模块 .py 文件和 .pyi 存根文件
:param file_name: 模型名称(也是文件名,不含后缀)
:param data_type: 参数类型
:param data_necessary: 必需要的参数
:param data_unnecessary: 非必需参数
:param project_name: 项目名称
:param mysql_tools_file: 数据库工具类相对路径(例如:ToolsCreator.MySQLController)[.代表当前目录..代表上层目录]
:param create_file: 是否自动创建文件
生成 property 函数
:param property_use: 是否创建 property 函数
:param method_list: property 方法列表 ['getter', 'setter', 'deleter'] (设置 property 函数,property 本体会默认生成)
:return: (.py 文件内容, .pyi 文件内容)
"""
string = ''
string2 = ''
assert set(data_type.values()) | {int, float, str} == {int, float, str}, ValueError('当前版本只支持 int、float、str 三种类型!')
if not method_list:
method_list = []
if property_use:
variable_list = {parameter: data_type[parameter].__name__ for parameter in data_type}
string = make_property_function(variable_list, method_list)
string2 = make_property_stub(variable_list, method_list)
file_py = f"""
# _*_ coding:utf-8 _*_
# Project: {project_name}
# FileName: {file_name.lower()}.py
# ComputerUser:19305
# Day: {time.strftime('%Y/%m/%d', time.localtime(time.time()))}
# Time: {time.strftime('%H:%M', time.localtime(time.time()))}
# IDE: PyCharm
# 2022年,所有bug都将会被丢到海里喂鲨鱼!我说的!不容反驳!
import re
from {mysql_tools_file} import setting, Table, Int, Float, Varchar
{file_name.upper()}_TYPE = {{{', '.join(f'{parameter!r}: {data_type[parameter].__name__}' for parameter in data_type)}}}
{file_name.upper()}_NECESSARY = {data_necessary!r}
{file_name.upper()}_UNNECESSARY = {data_unnecessary!r}
{file_name.upper()}_RE_TYPE = {{
{f',{NEWLINE}'.join(f" {parameter!r}: re.compile(R'.*')" for parameter in data_type)}
}}
{file_name.upper()}_FIELD_TYPE = {{
'id': Int(null=True, primary=True, auto=True), # 自动递增id
{f',{NEWLINE}'.join(f' {parameter!r}: {(data_type[parameter] == int and "Int(null=True)") or (data_type[parameter] == float and "Float(null=True)") or "Varchar(null=True)"}' for parameter in data_necessary)}{',' if data_unnecessary else ''}
{f',{NEWLINE}'.join(f' {parameter!r}: {(data_type[parameter] == int and "Int(null=False)") or (data_type[parameter] == float and "Float(null=False)") or "Varchar(null=False)"}' for parameter in data_unnecessary)}{NEWLINE if data_unnecessary else ''}}}
def check(function):
def type_check(self, **kwargs):
if kwargs.get('id'):
self._{file_name.title()}__id = kwargs.get('id')
del kwargs['id']
for n, attributes in enumerate(zip(list(kwargs.keys()), tuple(kwargs.values()))):
if {file_name.upper()}_TYPE.get(attributes[0], str) == float and isinstance(attributes[1], int):
if kwargs.get(attributes[0], None):
kwargs[attributes[0]] = float(kwargs[attributes[0]])
elif {file_name.upper()}_TYPE.get(attributes[0], str) == str:
kwargs[attributes[0]] = str(kwargs[attributes[0]])
elif not isinstance(attributes[1], {file_name.upper()}_TYPE.get(attributes[0], str)):
raise ValueError(f'Variable `{{attributes[0]}}` must be type of `{{{file_name.upper()}_TYPE.get(attributes[0], str)}}`')
elif not {file_name.upper()}_RE_TYPE.get(attributes[0], re.compile('.*?')).fullmatch(str(attributes[1])):
raise ValueError(f'Variable `{{attributes[0]}}`="{{attributes[1]}}" is not a correct value`')
return function(self, **kwargs)
return type_check
class {file_name.title()}:
@check
def __init__(self, *, {', '.join(data_necessary)}{''.join(f', {parameter}=None' for parameter in data_unnecessary)}):
{NEWLINE.join(f' self.__{parameter} = {parameter}' for parameter in data_type)}
def commit(self, table): # 插入或更新数据
if table.select(fields='{data_necessary[0]}', where=dict({data_necessary[0]}=self.__{data_necessary[0]})):
return table.update(where=dict({data_necessary[0]}=self.__{data_necessary[0]}), values={{key: self.__dict__.get(f'_{file_name.title()}__{{key}}') for key in {file_name.upper()}_NECESSARY + {file_name.upper()}_UNNECESSARY if self.__dict__.get(f'_{file_name.title()}__{{key}}')}})
else:
return table.insert(fields=True, values={{key: self.__dict__.get(f'_{file_name.title()}__{{key}}') for key in {file_name.upper()}_NECESSARY + {file_name.upper()}_UNNECESSARY if self.__dict__.get(f'_{file_name.title()}__{{key}}')}})
def delete(self, table): # 删除数据
if table.select(fields='{data_necessary[0]}', where=dict({data_necessary[0]}=self.__{data_necessary[0]})):
return table.delete(where=dict({data_necessary[0]}=self.__{data_necessary[0]}))
return False{string}
def make_all_{file_name.lower()}s(table):
def format_data(data):
data = {{key: {file_name.upper()}_TYPE[key](data[key]) if key != 'id' else int(data[key]) for key in data if data[key]}} # 改变当键为id时的处理方式
return data
for {file_name.lower()} in table.select():
yield {file_name.title()}(**format_data({file_name.lower()}))
if __name__ == '__main__':
setting()
{file_name.lower()}s = Table('{file_name.lower()}s', **{file_name.upper()}_FIELD_TYPE)
""".lstrip('\n')
file_byi = f"""
from re import Pattern
from types import FunctionType
from {mysql_tools_file} import Int, Float, Varchar, Table
from typing import Iterator, Dict, List, Callable, NoReturn
{file_name.upper()}_TYPE: Dict[str, Callable] = ... # 参数类型
{file_name.upper()}_NECESSARY: List[str] = ... # 必需要的参数
{file_name.upper()}_UNNECESSARY: List[str] = ... # 非必需参数
{file_name.upper()}_RE_TYPE: Dict[str, Pattern] = ... # 参数字符串正则检验
{file_name.upper()}_FIELD_TYPE: Dict[str, Int | Float | Varchar] = ... # 参数数据库字段对应
def check(function) -> FunctionType: # 检查函数装饰器
def type_check(self: Callable = ..., **kwargs: Dict[str, int | float | str]) -> NoReturn | ValueError: ... # 检查各参数是否符合类型
class {file_name.title()}:
def __init__(self, *,
{f',{NEWLINE}'.join(f' {parameter}: {data_type[parameter].__name__}' for parameter in data_necessary)}{',' if data_unnecessary else ''}
{f',{NEWLINE}'.join(f' {parameter}: {data_type[parameter].__name__} = ...' for parameter in data_unnecessary)}
) -> NoReturn:
{NEWLINE.join(f' self.__{parameter}: {data_type[parameter].__name__} = ...' for parameter in data_type)}
def commit(self, table: Table) -> bool: ...
def delete(self, table: Table) -> bool: ...{string2}
def make_all_{file_name.lower()}s(table: Table) -> Iterator[{file_name.title()}]: # 生成所有对象
def format_data(data: [str, int | float | str]) -> Dict[str, int | float | str]: ... # 格式化数据
""".strip('\n')
if create_file:
open(f'{file_name.lower()}.py', 'w', encoding='utf-8').write(file_py)
open(f'{file_name.lower()}.pyi', 'w', encoding='utf-8').write(file_byi)
return file_py, file_byi