import os
import subprocess
import pymysql.cursors
ALL_TABLES = []
def subprocess_popen(statement):
file_out = subprocess.Popen(statement, shell=True, stdout=subprocess.PIPE)
break_count = 0
while True:
line = file_out.stdout.readline()
print(line.decode('utf-8').strip('\r\n'))
if subprocess.Popen.poll(file_out) == 0:
break
if not line.decode('utf-8').strip('\r\n'):
break_count += 1
else:
break_count = 0
if break_count >= 10:
break
def export_sql_file():
MYSQL_INFO = {
"DB": "db_name",
"HOST": "127.0.0.1",
"PORT": "3306",
"USER": "root",
"PASSWORD": "",
}
dump_dir = os.path.expanduser("~/Desktop/export_sql_dir")
password_cmd = ""
if MYSQL_INFO['PASSWORD']:
password_cmd = f"-p'{MYSQL_INFO['PASSWORD']}'"
index_count = 0
for table_name in ALL_TABLES[index_count:]:
dump_cmd = f"mysqldump --column-statistics=0 -u{MYSQL_INFO['USER']} -P {MYSQL_INFO['PORT']} {password_cmd} -h {MYSQL_INFO['HOST']} --default-character-set=utf8 -t {MYSQL_INFO['DB']} --single-transaction --no-tablespaces "
print(f"-------------export {table_name} ------------- index of ALL_TABLES: {index_count}")
dump_cmd += f"--tables {table_name} --where='id=1000000000' > {dump_dir}/{table_name}.sql"
{dump_dir}/{table_name}.sql"
print(dump_cmd)
index_count += 1
subprocess_popen(dump_cmd)
print("")
def import_by_file():
MYSQL_INFO = {
"DB": "db_name",
"HOST": "127.0.0.1",
"PORT": "3306",
"USER": "root",
"PASSWORD": "",
}
dump_dir = os.path.expanduser("~/Desktop/export_sql_dir")
index_count = 0
for table_name in ALL_TABLES[index_count:]:
print(f"-------------import {table_name} ------------- index of ALL_TABLES: {index_count}")
import_cmd = f"mysql -u{MYSQL_INFO['USER']} -P{MYSQL_INFO['PORT']} -p{MYSQL_INFO['PASSWORD']} -h {MYSQL_INFO['HOST']} {MYSQL_INFO['DB']} < {dump_dir}/{table_name}.sql"
subprocess_popen(import_cmd)
print(import_cmd)
print("")
index_count += 1
def truncate_table():
MYSQL_INFO = {
"DB": "db_name",
"HOST": "127.0.0.1",
"PORT": "3306",
"USER": "root",
"PASSWORD": "",
}
connection = pymysql.connect(host=MYSQL_INFO['HOST'],
user=MYSQL_INFO['USER'],
password=MYSQL_INFO['PASSWORD'],
database=MYSQL_INFO['DB'],
port=int(MYSQL_INFO['PORT']),
cursorclass=pymysql.cursors.DictCursor)
with connection:
with connection.cursor() as cursor:
index_count = 0
for table_name in ALL_TABLES[index_count:]:
print(f"-------------truncate {table_name} ------------- index of ALL_TABLES: {index_count}")
sql = f"TRUNCATE TABLE {table_name}"
print(sql)
print("")
index_count += 1
if __name__ == '__main__':
export_sql_file()