0
点赞
收藏
分享

微信扫一扫

在不知道表名和字段名的情况下,查找出哪些字段里包含“关键字”的数据。

腾讯优测 2023-05-29 阅读 82

需求:

在不知道表名和字段名的情况下,查找出哪些字段里包含“关键字”的数据。

思路1:整库全量dump,按照表名导出txt纯文本格式,然后用grep命令过滤。

find ./ -name "*.sql" | xargs grep '关键字' >> result.log

思路2:用python全量扫描跑批,涉及到varchar的字段都扫一遍。

import pymysql
from concurrent.futures import ThreadPoolExecutor, as_completed

config = {
    "host": "127.0.0.1",
    "user": "admin",
    "password": "123456",
    "db": "test"
}

 
# 函数用于在给定列中搜索关键字
def search_column(table_name, column_name):
    query = f"SELECT * FROM `{table_name}` WHERE `{column_name}` LIKE '%关键字%'"
    with pymysql.connect(**config) as conn:
        with conn.cursor() as cursor:
            cursor.execute(query)
            result = cursor.fetchall()
            if len(result) > 0:
                with open("result.txt", "a", encoding="utf-8") as result_file:
                    result_file.write(f"表名: {table_name},列名: {column_name}\n")
                    #for row in result:
                        #result_file.write(str(row))
                        #result_file.write("\n")

try:
    # 获取表名列表
    with pymysql.connect(**config) as conn:
        with conn.cursor() as cursor:
            cursor.execute("SHOW TABLES")
            tables = cursor.fetchall()
 
    # 迭代所有表和列名称,并在每个列中搜索关键字
    with ThreadPoolExecutor(max_workers=10) as executor: # 根据要求更改 max_workers
        all_tasks = []
        for table in tables:
            table_name = table[0]
            with pymysql.connect(**config) as conn:
                with conn.cursor() as cursor:
                    cursor.execute(f"select COLUMN_NAME from information_schema.COLUMNS where TABLE_SCHEMA = 'test' and TABLE_NAME = '{table_name}' and DATA_TYPE in ('char','varchar','text','tinytext','mediumtext','longtext') ")
                    columns = cursor.fetchall()
                    for column in columns:
                        column_name = column[0]
                        # 调度一个任务来在此列中搜索关键字
                        task = executor.submit(search_column, table_name, column_name)
                        all_tasks.append(task)

        # 等待所有任务完成
        for task in as_completed(all_tasks):
            pass

except (pymysql.err.OperationalError, TypeError, FileNotFoundError) as e:
    print(f"错误信息:{str(e)}")

print("程序运行结束。")

举报

相关推荐

0 条评论