一、模块
Json模块
-
json.dumps()
-
json.loads()
# json模块使用
# 导入模块
import json # 直接导入模块
# 1-打开读取jiso文件
with open('D:/Python大数据就业/4-Python基础/day07-模块-网络-多任务/4-资料/order_detial.json','r',encoding='utf-8') as f:
data_lines = f.readlines()
print(data_lines)
# 2- 对读取的文件数据进行处理
# 需要将每行的json数据转为字典处理
for line in data_lines:
# 对每行数据去除\n
line_str = line.replace('\n','')
print(line_str)
# 将json字符串转为字典 loads 将字典数据转为json字符串 json_str = json.dumps(字典数据)
# 如果直接导入模块 需要使用 模块名.函数() 模块名.变量
line_dict = json.loads(line_str)
print(type(line_dict))
# 使用字典操作方法处理数据
total_price= line_dict['total_price']
print(total_price)
Datetime模块
# 日期时间模块
from datetime import datetime,timedelta
# 获取当前日期时间
dt_now = datetime.now()
print(dt_now)
# 获取时间戳 timestamp需要将日期传递到方法中
unixtime_now = datetime.timestamp(dt_now)
print(unixtime_now)
# 将时间戳数据转为datetime时间
dt = datetime.fromtimestamp(1617021920)
print(dt)
# 日期的加减
# 获取加减的时间单位
t = timedelta(days=7)
dt_add = dt_now + t
print(dt_add)
dt_sub = dt_now - t
print(dt_sub)
# 字符串时间 转为datetime类型的时间
dt_str = '2017-10-20 15:20:22'
dt2 = datetime.strptime(dt_str,'%Y-%m-%d %H:%M:%S')
print(type(dt2))
# 将datetime类型数据转为 字符串
dt_now_str = dt_now.strftime('%Y/%m/%d %H:%M:%S')
print(dt_now_str)
Decimal模块
# Decimal模块使用
# 导入模块中的所有函数,类,变量
from decimal import *
# 将小数数据转为Decimal
d = Decimal(3.1415926)
print(d)
# 指定保留小数 会进行四舍五入
d2 = d.quantize(Decimal('0.000'))
print(d2)
d3 = Decimal(5.123).quantize(Decimal('0.000'))
d4 = d2+d3
print(d4)
random模块
# random使用
# 将randint函数通过as命名了一个别称
# randint 可以根据指定范围随机产生一个整数数据
from random import randint as rt
num = rt(0,3)
print(num)
name = ['张三','李四','王五','赵六','aa','bb']
# 随机产生下标
index = rt(0,len(name)-1)
print(name[index])
# 验证码
date_str = '123456789asdfghjklqwertyuiopzxcvbnm'
index1 = rt(0,len(date_str)-1)
index2 = rt(0,len(date_str)-1)
index3 = rt(0,len(date_str)-1)
index4 = rt(0,len(date_str)-1)
code = date_str[index1] + date_str[index2] + date_str[index3] +date_str[index4]
print(code)
requests模块使用
# 请求服务器获取json数据
import requests
# 1-发送请求
# 定义请求网址
url = 'https://api.oioweb.cn/api/weather/weather?city_name=上海市'
# 定义请求头部数据
headers = {
'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36'
}
# 发送get请求
respose = requests.get(url,headers=headers) # 定义变量接收响应的数据
# 对返回的json数据进行解析,转为字典
response_dict = respose.json()
print(response_dict)
# 提取需要的字段数据
# 获取风向数据
wind_direction = response_dict['result']['wind_direction']
print(wind_direction)
# 获取html页面中数据
import requests
from lxml import etree
# 1-发送请求
# 定义请求网址
url = 'https://resource.ityxb.com/booklist/find.html?cz-pc-dh'
# 定义请求头部数据
headers = {
'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36'
}
# 发送get请求
respose = requests.get(url,headers=headers) # 定义变量接收响应的数据
# 获取html的页面数据
html_text = respose.text
# 将html页面转为element元素数据
html_element = etree.HTML(html_text)
# 使用xpath语法开发获取数据
book_name1 = html_element.xpath('//*[@id="content-container"]/div[2]/div[3]/div/div[1]/div[1]/div[2]/p/a/text()')[0]
book_name2 = html_element.xpath('//*[@id="content-container"]/div[2]/div[3]/div/div[1]/div[2]/div[2]/p/a/text()')[0]
book_name3 = html_element.xpath('//*[@id="content-container"]/div[2]/div[3]/div/div[1]/div[3]/div[2]/p/a/text()')[0]
book_name4 = html_element.xpath('//*[@id="content-container"]/div[2]/div[3]/div/div[1]/div[4]/div[2]/p/a/text()')[0]
print(book_name1)
print(book_name2)
print(book_name3)
print(book_name4)
pymysql模块使用
# 使用pymysql模块操作数据
import pymysql
# 创建mysql的连接
clinet = pymysql.connect(host='localhost',port=3306,user='root',password='root')
# 生成游标操作数据库
cur = clinet.cursor()
# 数据库操作
# 本质是写sql语句,写在字符串中
sql_str = '''
create database if not exists pydata45 charset=utf8
'''
# 执行sql
cur.execute(sql_str)
# 创建表
sql_str2 = '''
create table if not exists pydata45.user(
id int,
name varchar(20),
age int,
gender varchar(20)
)
'''
# 执行sql语句
cur.execute(sql_str2)
# 写入数据
# 将列表数据写表中
sql_str3 = '''
insert into pydata45.user values(%s,%s,%s,%s)
'''
data_list= [1,'张三',20,'男']
# 执行写入语句
cur.execute(sql_str3,data_list)
# 写入数据必须有commit
clinet.commit()
# 将字典数据写入进入
sql_str4 = '''
insert into pydata45.user values(%(id)s,%(username)s,%(age)s,%(gender)s)
'''
data_dict= {'id':2,'username':'李四','age':20,'gender':'男'}
# 执行写入语句
cur.execute(sql_str4,data_dict)
# 写入数据必须有commit
clinet.commit()
# 查询写入的数据
sql_str5 = '''
select * from pydata45.user
'''
# 执行sql语句
cur.execute(sql_str5)
# 获取查询的数据
data = cur.fetchall()
print(data)
# 关闭连接
cur.close()
clinet.close()
自定义模块
# 自定义的模块文件
# 封装业务代码
name = '张三'
def add_func(a,b):
data = a+b
return data
class Student:
def __int__(self,name,age):
self.name = name
self.age = age
def func(self):
print(self.name)
print(self.age)
# 开发人员自己的代码文件
from itcast import name,add_func,Student
print(name)
res = add_func(10,20)
print(res)
s = Student('张三',20)
s.func()
二、网络编程
# 使用socket开发客户端
import socket
# socket.AF_INET 支持ipV4的ip地址
# socket.SOCK_STREAM 使用TCP协议
s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
# 指定连接服务器的ip地址和端口
s.connect(('127.0.0.1',8000))
# 发送消息
# encode('utf-8') 将字符串数据转为bytes
s.send('hello!!'.encode('utf-8'))
# 接受返回的消息
# 接受的数据也是bytes 需要转为字符串 decode('utf-8')
data = s.recv(1024)
print(data.decode('utf-8'))
# 关闭
s.close()
# 服务端开发
import socket
# 创建socket对象
# socket.AF_INET 指定使用ipv4访问服务
# socket.SOCK_STREAM 使用tcp协议通讯
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 指定服务器绑定的ip地址和端口
server.bind(('192.168.174.53',8888))
# 指定监听请求的数量
server.listen(100)
# 需要再循环内持续处理客户端的请求
while True:
# client_socket接受客户端发送的数据
client_socket,client_ip=server.accept()
print(client_ip)
# 提取客户端的数据
# 接受的数据是byte类型 需要转为字符串
# decode
data = client_socket.recv(1024)
print(data.decode('utf-8'))
# 返回响应数据
# 'hello python'.encode('utf-8') 字符串转为byte
client_socket.send('hello python'.encode('utf-8'))
if data.decode('utf-8') == '1':
break
client_socket.close()
server.close()
三 多任务
import time
def sing():
print('唱歌')
time.sleep(4) # 停止4秒 模拟程序执行4秒
print('唱歌2')
def dance():
print('跳舞')
def tanzou():
print('弹吉他')
sing()
dance()
tanzou()
使用多进程实现多任务
# 多进程实现多任务
import time
# 使用模块方法创建多个进程
from multiprocessing import Process
def sing():
print('唱歌')
time.sleep(4) # 停止4秒 模拟程序执行4秒
print('唱歌2')
def dance():
print('跳舞')
def tanzou():
print('弹吉他')
if __name__ == '__main__':
# 创建进程
# 创建不同的进程执行不同的任务
p1 = Process(target=sing)
p2 = Process(target=dance)
p3 = Process(target=tanzou)
# 执行进程
p1.start()
p2.start()
p3.start()
任务中的参数传递
# 多进程实现多任务
import time
# 使用模块方法创建多个进程
from multiprocessing import Process
def sing(username,singname):
print(f'唱{username}的{singname}歌')
def dance(name):
print(f'跳{name}舞')
def tanzou():
print('弹吉他')
if __name__ == '__main__':
# 创建进程
# 创建不同的进程执行不同的任务
# 传递参数的两种方式
p1 = Process(target=sing,kwargs={'username':'周杰伦','singname':'稻香'})
p2 = Process(target=dance,args=['霹雳'])
p3 = Process(target=tanzou)
# 执行进程
p1.start()
p2.start()
p3.start()