0
点赞
收藏
分享

微信扫一扫

Python采用并发查询mysql以及调用API灌数据 (五)- 查询mysql数据,拼接进行POST请求...

yeamy 2022-07-12 阅读 94

Python采用并发查询mysql以及调用API灌数据 (五)- 查询mysql数据,拼接进行POST请求..._mysql


前情回顾

上一篇文章已经编写了http请求的基本类方法封装,那么本章节我们来继续编写使用mysql查询后的拼接数据发送POST请求。

实战任务

本次因为服务架构重构,表优化、重构,带来的任务就是需要从原来的mysql数据库中,读取原表数据(部分存在多张关联查询)然后通过调用API的服务方式灌入新的数据库表中(包含mysql、mongodb)。

执行流程如下


Python采用并发查询mysql以及调用API灌数据 (五)- 查询mysql数据,拼接进行POST请求..._字段_02


那么根据流程所需要的功能,需要以下的实例进行支撑:
1.并发实例
2.查询数据实例
3.执行post请求实例

目标:编写Http执行POST请求的基本类方法

编写test03.py查询mysql相关对应字段数据

# -*- coding: utf-8 -*-

from tools.MysqlTools import MysqldbHelper
import pymysql
from tools.PostTools import PostHelper

if __name__ == "__main__":

# 定义数据库访问参数
config = {
'host': '******#注释',
'port': 3361,
'user': 'root',
'passwd': '******#注释',
'charset': 'utf8',
'cursorclass': pymysql.cursors.DictCursor
}

# 初始化打开数据库连接
mydb = MysqldbHelper(config)

# 选择数据库
DB_NAME = '******#注释'
# mydb.createDataBase(DB_NAME)

# 选择数据库
print "========= 选择数据库%s ===========" % DB_NAME
mydb.selectDataBase(DB_NAME)

#选择表
TABLE_NAME = '******#注释'

# 数据查询
print "========= 数据查询 ==========="
select_fields = [
"id",
******#注释
"1",
"1",
"null",
"NOW()",
"last_sync_time",
]
select_order = "order by id desc limit 1"
select_result = mydb.select(TABLE_NAME, fields=select_fields,order=select_order)

# 拆分打印的字段
for row in select_result:
print "id=", row['id']
print "user_id=", row['user_id']
******#注释
print "1=", row['1']
print "1=", row['1']
print "null=", None # 注意:查询null的需要写为None
print "NOW()=", row['NOW()']
print "last_sync_time=", row['last_sync_time']
print

执行后的结果如下:

========= 数据查询 ===========
id= 1066511830261694464
user_id= ******#注释
user_code= None
******#注释
null= None
NOW()= 2018-11-27 11:20:22
last_sync_time= 2018-11-25 10:00:10

从上面的代码可以看出,写一个表的查询,由于字段太多,重复去写这个过程的话也是很累的,那么再抽象多一层model模型,专门进行这个数据处理的。

编写model类,抽象查询的过程方法


Python采用并发查询mysql以及调用API灌数据 (五)- 查询mysql数据,拼接进行POST请求..._字段_03

models.py

我新建了一个core文件夹目录,然后新建一个models,专门用来处理查询以及调用API发送请求的业务处理。

models.py代码如下:

# -*- coding: utf-8 -*-

from tools.MysqlTools import MysqldbHelper

class ModelHelper(object): # 继承object类所有方法

# 初始化数据库连接
def __init__(self , config):
self.mydb = MysqldbHelper(config) # 初始化打开数据库连接

def selectTable(self,DB_NAME,TABLE_NAME,fields,order):
# 选择数据库
self.mydb.selectDataBase(DB_NAME)
# 数据查询
print "========= 数据查询 ==========="
result = self.mydb.select(TABLE_NAME, fields=fields,order=order)
# 返回查询的数据
return result

编写test04.py测试文件,执行测试一下:

# -*- coding: utf-8 -*-

from tools.MysqlTools import MysqldbHelper
import pymysql
from tools.PostTools import PostHelper
from core.models import ModelHelper

if __name__ == "__main__":

# 定义数据库访问参数
config = {
'host': '#####注释####',
'port': 3361,
'user': 'root',
'passwd': '#####注释####',
'charset': 'utf8',
'cursorclass': pymysql.cursors.DictCursor
}
# 初始化数据模型
model = ModelHelper(config)
# 设置需要查询的数据库
DB_NAME = '#####注释####'
# 设置需要查询的表明
TABLE_NAME = '#####注释####'
# 数据查询
select_fields = [
"id",
#####注释####
"1",
"1",
"null",
"NOW()",
"last_sync_time",
]
select_order = "order by id desc limit 1"
select_result = model.selectTable(DB_NAME,TABLE_NAME,select_fields,select_order)

# 拆分打印的字段
for row in select_result:
print "id=", row['id']
#####注释####
print "1=", row['1']
print "1=", row['1']
print "null=", None
print "NOW()=", row['NOW()']
print "last_sync_time=", row['last_sync_time']
print

测试结果如下:

id= 1066511830261694464
#####注释####
1= 1
1= 1
null= None
NOW()= 2018-11-27 11:45:08
last_sync_time= 2018-11-25 10:00:10

那么已经抽象了这部分查询在model中处理了,还有下一步请求API也要写入到model中自动处理。
那么下面来继续写写。

将返回的查询结果转化为字典类型数据

其中查询的旧表字段与新表的字段应该要用字典进行一一映射关联,方便后续调用。
1、定义字典存储 旧表字段 《==》新表字段的映射关系
2、获取旧表字段数据,进行数据查询
3、获取新表字段对应存储数据,再次使用API请求新表,灌入数据

# 设置字段映射字典: 旧表查询字段 ==》 新表的字段
dict_fields = {
"id": "id",
"user_id": "user_id",
## 注释部分字段
"1": "purpose",
"1": "status",
"null": "data_source",
"NOW()": "create_time",
"last_sync_time": "last_modify_time"
}
# 获取旧表字段数组
select_fields = []
for key, value in dict_fields.items():
print "key = %s , value = %s" % (key,value)
select_fields.append(key)

print "select_fields = "
print select_fields

执行结果如下:

# 映射字典的查询数据
key = user_code , value = user_code
key = null , value = data_source
###### 注释部分 #######
key = NOW() , value = create_time
key = 1 , value = status
key = user_level , value = user_level

# 获取生成旧表需要查询的字段
select_fields = [
'census_town',
###### 注释部分 #######
'NOW()', '1',
'user_level'
]

1、那么下面就可以根据获取的字段数据,进行mysql数据查询
2、然后生成一个body请求体字典数据,但是此时body的请求体key是旧表的字段,请求API的时候需要新表的字段,那么就需要进行字段替换
3、再写一个字段映射字典的循环,生成请求API的new_body

# 此时已有查询字段的数组
print "select_fields = "
print select_fields

select_order = "order by id desc limit 2"
select_result = model.selectTable(DB_NAME,TABLE_NAME,select_fields,select_order)

# 循环生成每条查询数据的请求body
body = {}
for result in select_result:
for field in select_fields:
if field == "null":
body[field] = None
else:
body[field] = result[field]

# 打印查看已生成的body数据
for field in select_fields:
print body[field]

print body

# 更新body的字段为新表的字段
new_body = {}
for key, value in dict_fields.items():
print "key = %s , value = %s" % (key,value)
new_body[value] = body[key]

print "new_body="
print new_body

执行结果如下:




Python采用并发查询mysql以及调用API灌数据 (五)- 查询mysql数据,拼接进行POST请求..._mysql_04


那么上面的过程最好写在model中,这样可以方便使用。

编写model增加生成请求API的body数据相关方法


Python采用并发查询mysql以及调用API灌数据 (五)- 查询mysql数据,拼接进行POST请求..._字段_05


# -*- coding: utf-8 -*-

from tools.MysqlTools import MysqldbHelper
from tools.PostTools import PostHelper

class ModelHelper(object): # 继承object类所有方法

# 初始化数据库连接
def __init__(self , config):
self.mydb = MysqldbHelper(config) # 初始化打开数据库连接

# 根据设置的旧表字段,查询旧库的数据库数据
def selectTable(self,DB_NAME,TABLE_NAME,fields,order):
# 选择数据库
self.mydb.selectDataBase(DB_NAME)
# 数据查询
result = self.mydb.select(TABLE_NAME, fields=fields,order=order)
# 返回查询的数据
return result

# 根据字段映射字典获取旧表字段数组
def getSelectFields(self,dict_fields):
# 获取旧表字段数组
select_fields = []
for key, value in dict_fields.items():
# print "key = %s , value = %s" % (key, value)
select_fields.append(key)
return select_fields

# 根据查询的结果以及字段字典,转化为请求API的body
def convertApiBody(self,result,dict_fields):
# 循环生成每条查询数据的请求body
body = {}
for result in result:
for field in result:
if field == "null":
body[field] = None
else:
body[field] = result[field]
# 更新body的字段为新表的字段
new_body = {}
for key, value in dict_fields.items():
# print "key = %s , value = %s" % (key, value)
if key == "null":
new_body[value] = None
else:
new_body[value] = body[key]
return new_body

使用model方法,以及执行结果:




Python采用并发查询mysql以及调用API灌数据 (五)- 查询mysql数据,拼接进行POST请求..._数据_06


那么下一步就是再编写一个执行API的方法。

但是在请求API之前,需要将body序列化为json格式,这个存在datetime类型导致序列化失败的情况,下一个篇章继续。


Python采用并发查询mysql以及调用API灌数据 (五)- 查询mysql数据,拼接进行POST请求..._数据_07





举报

相关推荐

0 条评论