需求:
在不知道表名和字段名的情况下,查找出哪些字段里包含“关键字”的数据。
思路1:整库全量dump,按照表名导出txt纯文本格式,然后用grep命令过滤。
find ./ -name "*.sql" | xargs grep '关键字' >> result.log
思路2:用python全量扫描跑批,涉及到varchar的字段都扫一遍。
import pymysql
from concurrent.futures import ThreadPoolExecutor, as_completed
config = {
"host": "127.0.0.1",
"user": "admin",
"password": "123456",
"db": "test"
}
# 函数用于在给定列中搜索关键字
def search_column(table_name, column_name):
query = f"SELECT * FROM `{table_name}` WHERE `{column_name}` LIKE '%关键字%'"
with pymysql.connect(**config) as conn:
with conn.cursor() as cursor:
cursor.execute(query)
result = cursor.fetchall()
if len(result) > 0:
with open("result.txt", "a", encoding="utf-8") as result_file:
result_file.write(f"表名: {table_name},列名: {column_name}\n")
#for row in result:
#result_file.write(str(row))
#result_file.write("\n")
try:
# 获取表名列表
with pymysql.connect(**config) as conn:
with conn.cursor() as cursor:
cursor.execute("SHOW TABLES")
tables = cursor.fetchall()
# 迭代所有表和列名称,并在每个列中搜索关键字
with ThreadPoolExecutor(max_workers=10) as executor: # 根据要求更改 max_workers
all_tasks = []
for table in tables:
table_name = table[0]
with pymysql.connect(**config) as conn:
with conn.cursor() as cursor:
cursor.execute(f"select COLUMN_NAME from information_schema.COLUMNS where TABLE_SCHEMA = 'test' and TABLE_NAME = '{table_name}' and DATA_TYPE in ('char','varchar','text','tinytext','mediumtext','longtext') ")
columns = cursor.fetchall()
for column in columns:
column_name = column[0]
# 调度一个任务来在此列中搜索关键字
task = executor.submit(search_column, table_name, column_name)
all_tasks.append(task)
# 等待所有任务完成
for task in as_completed(all_tasks):
pass
except (pymysql.err.OperationalError, TypeError, FileNotFoundError) as e:
print(f"错误信息:{str(e)}")
print("程序运行结束。")