P1 网络并发
1. 并发网络通信模型
1.1 常见模型分类
1.1.1 循环服务器模型
- 定义: 循环接收客户端请求,处理请求。同一时刻只能处理一个请求,处理完毕后再处理下一个。
- 优点: 实现简单,占用资源少
- 缺点: 无法同时处理多个客户端请求
- 适用情况: 处理的任务可以很快完成,客户端无需长期占用服务端程序。udp比tcp更适合循环。
1.1.2 IO并发模型
- 定义:利用IO多路复用,异步IO等技术,同时处理多个客户端IO请求
- 优点:消耗资源少,能同时高效处理多个IO行为
- 缺点:只能处理并发产生的IO事件,无法处理CPU计算
- 适用情况:HTTP请求,网络传输等都是IO行为
1.1.3 多进程/线程网络并发模型
- 定义:每当一个客户端连接服务器,就创建一个新的进程/线程为该客户端服务,
客户端退出时在销毁该进程/线程。 - 优点:能同时满足多个客户端长期占有服务端需求,可以处理各种请求
- 缺点:资源消耗大
- 适用情况:客户端同时连接量较少,需要处理行为较复杂情况
1.2 基于fork的多进程网络并发模型
1.2.1 实现步骤
- 创建监听套接字
- 等待接收客户端请求
- 客户端连接创建新的进程处理客户端请求
- 原进程继续等待其他客户端连接
- 如果客户端退出,则销毁对应的进程
1.2.2 代码实现
"""
fork_server.py 基于fork的多进程并发
1.创建监听套接字
2.等待接收客户端请求
3.客户端连接创建新的进程处理客户端请求
4.原进程继续等待其他客户端连接
5.如果客户端退出,则销毁对应的进程
"""
from socket import *
import os
import signal
# 全局变量
HOST = '0.0.0.0'
PORT = 8888
ADDR = (HOST, PORT)
# 创建监听套接字
s = socket()
s.setsockopt(SOL_SOCKET, SO_REUSEADDR, True)
s.bind(ADDR)
s.listen(5)
# 处理僵尸进程
signal.signal(signal.SIGCHLD, signal.SIG_IGN)
print("Listen the port 8888...")
# 具体处理客户端请求
def handle(c):
while True:
data = c.recv(1024)
if not data:
break
print(data.decode())
c.send(b'OK')
c.close()
while True: # 循环处理客户端连接
try:
c, addr = s.accept()
print("Connect from", addr)
except KeyboardInterrupt:
os._exit(0)
except Exception as e:
print(e)
continue
# 创建子进程处理客户端事务
pid = os.fork()
if pid == 0:
s.close()
handle(c) # 处理具体事务
os._exit(0) # 子进程销毁
# 无论父进程还是fork出错都继续处理连接
else:
c.close() # 父进程不需要和客户端通信
"""
thread_server.py 基于threading多线程并发
1.创建监听套接字
2.等待接收客户端请求
3.客户端连接创建新的进程处理客户端请求
4.原进程继续等待其他客户端连接
5.如果客户端退出,则销毁对应的进程
"""
from socket import *
from threading import Thread
import sys
# 设置全局变量
HOST = '0.0.0.0'
PORT = 8888
ADDR = (HOST, PORT)
# 创建套接字
s = socket()
s.setsockopt(SOL_SOCKET, SO_REUSEADDR, True)
s.bind(ADDR)
s.listen(5)
print("Listen the port 8888...")
# 处理具体客户端请求
def handle(c):
while True:
data = c.recv(1024)
if not data:
break
print(data.decode())
c.send(b'OK')
c.close()
# 循环等待客户端连接
while True:
try:
c, addr = s.accept()
print("Connect from:", addr)
except KeyboardInterrupt:
sys.exit("退出服务器")
except Exception as e:
print(e)
continue
# 创建线程处理请求
t = Thread(target=handle, args=(c,))
t.setDaemon(True)
t.start()
1.3 IO并发
1.3.1 IO分类
- 阻塞IO
- 非阻塞IO
- IO多路复用
- 异步IO
1.3.2 阻塞IO
- 定义:在执行IO操作时如果执行条件不满足则阻塞。阻塞IO是IO的默认形态
- 效率:阻塞IO是效率很低的一种IO。但是由于逻辑简单所以是默认IO行为
- 阻塞情况
因为某种执行条件没有满足造成的函数阻塞 # accept input recv
处理IO的时间较长产生的阻塞状态 # 网络传输,大文件读写
1.3.3 非阻塞IO
定义: 通过修改IO属性行为,使原本阻塞的IO变为非阻塞状态
sockfd.setblocking(bool)
# 功能:设置套接字为非阻塞IO
# 参数:默认为True,表示套接字IO阻塞;设置为False则套接字IO变为非阻塞
# 超时检测:设置一个最长阻塞时间,超过该时间后则不再阻塞等待
sockfd.settimeout(sec)
# 功能:设置套接字的超时时间
# 参数:设置的时间
"""
block_io.py
socket 套接字非阻塞示例
"""
from socket import *
from time import ctime, sleep
# 日志文件
f = open('log.txt', 'a+')
# tcp套接字
sockfd = socket()
sockfd.bind(('127.0.0.1', 8888))
sockfd.listen(3)
# 设置套接字为非阻塞
# sockfd.setblocking(False)
# 设置超时检测
sockfd.settimeout(3) # sockfd调用的所有函数最多阻塞3秒
while True:
print("Waiting for connect...")
# 没有客户端连接每隔3秒写一条日志
try:
connfd, addr = sockfd.accept()
except (BlockingIOError, timeout) as e:
sleep(3)
f.write("%s : %s\n" % (ctime(), e))
f.flush()
else:
print("Connect from", addr)
data = connfd.recv(1024).decode()
print(data)
1.4 IO多路复用
1.4.1 定义
同时监控多个IO事件,当哪个IO时间准备就绪就执行哪个IO事件。以此形成可以同时处理多个IO的行为,避免一个IO阻塞造成其他IO均无法执行,提高了IO执行效率
1.4.2 select方法
rs, ws, xs = select(rlist, wlist, xlist[,timeout])
# 功能:监控IO事件,阻塞等待IO发生
# 参数:
# rlist 列表 存放关注的等待发生的IO事件
# wlist 列表 存放关注的要主动处理的IO事件
# xlist 列表 存放关注的出现异常要处理的IO
# timeout 超时时间
# 返回值:
# rs 列表 rlist中准备就绪的IO
# ws 列表 wlist中准备就绪的IO
# xs 列表 xlist中准备就绪的IO
select实现tcp服务
- 将关注的IO放入对应的监控类别列表
- 通过select函数进行监控
- 遍历select返回值列表,确定就绪IO事件
- 处理发生的IO事件
注意:
- wlist中如果存在IO事件,则select立即返回给ws
- 处理IO过程中不要出现死循环占有服务端的情况
- IO多路复用消耗资源较少,效率较高
1.4.3 poll方法
p = select.poll()
# 功能:创建poll对象
# 返回值:poll对象
p.register(fd,event)
# 功能:注册关注的IO事件
# 参数:
# fd 要关注的IO
# event 要关注的IO事件类型
# 常用类型:
#POLLIN 读IO事件 (rlist)
#POLLOUT 写IO事件 (wlist)
#POLLERR 异常IO (xlist)
#POLLHUP 断开连接
# e.g. p.register(sockfd, POLLIN|POLLERR)
p.unregister(fd)
# 功能:取消对IO的关注
# 参数:IO对象或者IO对象的fileno
events = p.poll()
# 功能:阻塞等待监控的IO事件发生
# 返回值:
# 返回发生的IO
# events格式 [(fileno,event),()...]
# 每个元组为一个就绪IO,元组第一项是该IO的fileno,第二项为该IO就绪的事件类型
poll_server步骤
- 创建套接字
- 将套接字register
- 创建查找字典并维护
- 循环监控IO发生
- 处理发生的IO
"""
poll_server.py 完成tcp并发服务
思路分析:
IO多路复用实现并发
建立fileno-->io对象字典用于IO查找
步骤
创建套接字
将套接字register
创建查找字典并维护
循环监控IO发生
处理发生的IO
"""
from socket import *
from select import *
# 创建监听套接字,作为关注的IO
s = socket()
s.setsockopt(SOL_SOCKET, SO_REUSEADDR, True)
s.bind(('0.0.0.0', 8888))
s.listen(3)
# 创建poll对象
p = poll()
# 建立查找字典,通过一个IO的fileno找到IO对象
fd_map = {s.fileno(): s}
# 关注s
p.register(s, POLLIN | POLLERR)
# 循环监控IO发生
while True:
events = p.poll()
# 循环遍历列表,查看哪个IO就绪,进行处理
for fd, event in events:
# 区分哪个IO就绪
if fd == s.fileno():
c, addr = fd_map[fd].accept()
print("Connect from", addr)
# 关注客户端连接套接字
p.register(c, POLLIN | POLLERR)
fd_map[c.fileno()] = c # 维护字典
elif event & POLLIN: # 判断POLLIN就绪
data = fd_map[fd].recv(1024).decode()
if not data:
p.unregister(fd) # 取消关注
fd_map[fd].close()
del fd_map[fd]
continue
print(data)
fd_map[fd].send(b'OK')
1.4.4 epoll方法
- 使用方法:
- 基本与poll相同
- 生成对象改为epoll()
- 将所有事件类型改为EPOLL类型
- epoll特点
- epoll效率比select poll要高
- epoll监控IO数量比select要多
- epoll的触发方式比poll要多(EPOLLET边缘触发)
- epoll_server.py
1.5 协程技术
- 定义:纤程,微线程。是允许在不同入口点不同位置暂停或开始的计算机程序,简单来说,协程就是可以暂停执行的函数
- 协程原理:记录一个函数的上下文,协程调度切换时会将记录的上下文保存,在切换回来时进行调取,恢复原有的执行内容,以便从上一次执行位置继续执行
- 协程优缺点
优点
1. 协程完成多任务占用计算资源很少
2. 由于协程的多任务切换在应用层完成,因此切换开销少
3. 协程为单线程程序,无需进行共享资源同步互斥处理
缺点:协程本质是一个单线程,无法利用计算机多核资源
扩展延伸@标准库协程的实现
python3.5以后,使用标准库asyncio和async/await语法来编写并发代码。asyncio库通过对异步IO行为的支持完成python的协程。虽然官方说asyncio是未来的开发方向,但是由于其生态不够丰富,大量的客户端不支持awaitable需要自己去封装,所以在使用上存在缺陷。更多时候只能使用已有的异步库(asyncio等)功能有限
第三方协程模块
- greenlet模块
安装:pip install greenlet
greenlet.greenlet(func)
# 功能:创建协程对象
# 参数:协程函数
g.switch()
# 功能:选择要执行的协程函数
- gevent模块
安装pip install gevent
gevent.spawn(func,argv)
# 功能:生成协程对象
# 参数:
# func 协程函数
# argv 给协程函数传参(不定参)
# 返回值:协程对象
gevent.joinall(list,[timeout])
# 功能:阻塞等待协程执行完毕
# 参数:
# list 协程对象列表
# timeout 超时时间
gevent.sleep(sec)
# 功能:gevent睡眠阻塞
# 参数:睡眠时间
- gevent协程只有在遇到gevent指定的阻塞行为时才会自动在协程之间跳转
如gevent.joinall(),gevent.sleep()带来的阻塞
monkey脚本
- 作用:在gevent协程中,协程只有遇到gevent指定类型的阻塞才能跳转到其他协程,
因此,我们希望将普通的IO阻塞行为转换为可以触发gevent协程跳转的阻塞,以提
高执行效率 - 转换方法:gevent提供了一个脚本程序monkey,可以修改底层解释IO阻塞行为,将
很多普通阻塞转换为gevent阻塞 - 使用方法:
1. 导入monkey
from gevent import monkey
2. 运行相应的脚本,例如转换socket中所有阻塞
monkey.patch_socket()
3. 如果将所有可转换的IO阻塞全部转换则运行all
monkey.patch_all()
4. 注意:脚本运行函数需要在对应模块导入前执行