0
点赞
收藏
分享

微信扫一扫

Python高级软件技术(二)

佳简诚锄 2022-03-25 阅读 74

P1 网络并发

1. 并发网络通信模型

1.1 常见模型分类

1.1.1 循环服务器模型

  1. 定义: 循环接收客户端请求,处理请求。同一时刻只能处理一个请求,处理完毕后再处理下一个。
  2. 优点: 实现简单,占用资源少
  3. 缺点: 无法同时处理多个客户端请求
  4. 适用情况: 处理的任务可以很快完成,客户端无需长期占用服务端程序。udp比tcp更适合循环。

1.1.2 IO并发模型

  1. 定义:利用IO多路复用,异步IO等技术,同时处理多个客户端IO请求
  2. 优点:消耗资源少,能同时高效处理多个IO行为
  3. 缺点:只能处理并发产生的IO事件,无法处理CPU计算
  4. 适用情况:HTTP请求,网络传输等都是IO行为

1.1.3 多进程/线程网络并发模型

  1. 定义:每当一个客户端连接服务器,就创建一个新的进程/线程为该客户端服务,
    客户端退出时在销毁该进程/线程。
  2. 优点:能同时满足多个客户端长期占有服务端需求,可以处理各种请求
  3. 缺点:资源消耗大
  4. 适用情况:客户端同时连接量较少,需要处理行为较复杂情况

1.2 基于fork的多进程网络并发模型

1.2.1 实现步骤

  1. 创建监听套接字
  2. 等待接收客户端请求
  3. 客户端连接创建新的进程处理客户端请求
  4. 原进程继续等待其他客户端连接
  5. 如果客户端退出,则销毁对应的进程

1.2.2 代码实现

"""
fork_server.py  基于fork的多进程并发

    1.创建监听套接字
    2.等待接收客户端请求
    3.客户端连接创建新的进程处理客户端请求
    4.原进程继续等待其他客户端连接
    5.如果客户端退出,则销毁对应的进程
"""

from socket import *
import os
import signal

# 全局变量
HOST = '0.0.0.0'
PORT = 8888
ADDR = (HOST, PORT)

# 创建监听套接字
s = socket()
s.setsockopt(SOL_SOCKET, SO_REUSEADDR, True)
s.bind(ADDR)
s.listen(5)

# 处理僵尸进程
signal.signal(signal.SIGCHLD, signal.SIG_IGN)
print("Listen the port 8888...")


# 具体处理客户端请求
def handle(c):
    while True:
        data = c.recv(1024)
        if not data:
            break
        print(data.decode())
        c.send(b'OK')
    c.close()


while True:  # 循环处理客户端连接
    try:
        c, addr = s.accept()
        print("Connect from", addr)
    except KeyboardInterrupt:
        os._exit(0)
    except Exception as e:
        print(e)
        continue

    # 创建子进程处理客户端事务
    pid = os.fork()
    if pid == 0:
        s.close()
        handle(c)  # 处理具体事务
        os._exit(0)  # 子进程销毁
    # 无论父进程还是fork出错都继续处理连接
    else:
        c.close()  # 父进程不需要和客户端通信
"""
thread_server.py    基于threading多线程并发

    1.创建监听套接字
    2.等待接收客户端请求
    3.客户端连接创建新的进程处理客户端请求
    4.原进程继续等待其他客户端连接
    5.如果客户端退出,则销毁对应的进程
"""

from socket import *
from threading import Thread
import sys

# 设置全局变量
HOST = '0.0.0.0'
PORT = 8888
ADDR = (HOST, PORT)

# 创建套接字
s = socket()
s.setsockopt(SOL_SOCKET, SO_REUSEADDR, True)
s.bind(ADDR)
s.listen(5)

print("Listen the port 8888...")


# 处理具体客户端请求
def handle(c):
    while True:
        data = c.recv(1024)
        if not data:
            break
        print(data.decode())
        c.send(b'OK')
    c.close()


# 循环等待客户端连接
while True:
    try:
        c, addr = s.accept()
        print("Connect from:", addr)
    except KeyboardInterrupt:
        sys.exit("退出服务器")
    except Exception as e:
        print(e)
        continue

    # 创建线程处理请求
    t = Thread(target=handle, args=(c,))
    t.setDaemon(True)
    t.start()

1.3 IO并发

1.3.1 IO分类

  • 阻塞IO
  • 非阻塞IO
  • IO多路复用
  • 异步IO

1.3.2 阻塞IO

  1. 定义:在执行IO操作时如果执行条件不满足则阻塞。阻塞IO是IO的默认形态
  2. 效率:阻塞IO是效率很低的一种IO。但是由于逻辑简单所以是默认IO行为
  3. 阻塞情况
      因为某种执行条件没有满足造成的函数阻塞 # accept input recv
      处理IO的时间较长产生的阻塞状态 # 网络传输,大文件读写

1.3.3 非阻塞IO

定义: 通过修改IO属性行为,使原本阻塞的IO变为非阻塞状态

sockfd.setblocking(bool)
# 功能:设置套接字为非阻塞IO
# 参数:默认为True,表示套接字IO阻塞;设置为False则套接字IO变为非阻塞

# 超时检测:设置一个最长阻塞时间,超过该时间后则不再阻塞等待
sockfd.settimeout(sec)
# 功能:设置套接字的超时时间
# 参数:设置的时间
"""
block_io.py
socket  套接字非阻塞示例
"""

from socket import *
from time import ctime, sleep

# 日志文件
f = open('log.txt', 'a+')

# tcp套接字
sockfd = socket()
sockfd.bind(('127.0.0.1', 8888))
sockfd.listen(3)

# 设置套接字为非阻塞
# sockfd.setblocking(False)

# 设置超时检测
sockfd.settimeout(3)  # sockfd调用的所有函数最多阻塞3秒

while True:
    print("Waiting for connect...")
    # 没有客户端连接每隔3秒写一条日志
    try:
        connfd, addr = sockfd.accept()
    except (BlockingIOError, timeout) as e:
        sleep(3)
        f.write("%s : %s\n" % (ctime(), e))
        f.flush()
    else:
        print("Connect from", addr)
        data = connfd.recv(1024).decode()
        print(data)

1.4 IO多路复用

1.4.1 定义

同时监控多个IO事件,当哪个IO时间准备就绪就执行哪个IO事件。以此形成可以同时处理多个IO的行为,避免一个IO阻塞造成其他IO均无法执行,提高了IO执行效率

1.4.2 select方法

rs, ws, xs = select(rlist, wlist, xlist[,timeout])
# 功能:监控IO事件,阻塞等待IO发生
# 参数:
    # rlist  列表    存放关注的等待发生的IO事件
    # wlist  列表    存放关注的要主动处理的IO事件
    # xlist  列表    存放关注的出现异常要处理的IO
    # timeout    超时时间
# 返回值:
    # rs  列表    rlist中准备就绪的IO 
    # ws  列表    wlist中准备就绪的IO
    # xs  列表    xlist中准备就绪的IO

select实现tcp服务

  1. 将关注的IO放入对应的监控类别列表
  2. 通过select函数进行监控
  3. 遍历select返回值列表,确定就绪IO事件
  4. 处理发生的IO事件

注意:

  • wlist中如果存在IO事件,则select立即返回给ws
  • 处理IO过程中不要出现死循环占有服务端的情况
  • IO多路复用消耗资源较少,效率较高

1.4.3 poll方法

p = select.poll()
# 功能:创建poll对象
# 返回值:poll对象

p.register(fd,event)
# 功能:注册关注的IO事件
# 参数:
    # fd    要关注的IO
    # event   要关注的IO事件类型
        # 常用类型:
            #POLLIN  读IO事件 (rlist)
            #POLLOUT 写IO事件 (wlist)
            #POLLERR 异常IO   (xlist)
            #POLLHUP 断开连接
# e.g.    p.register(sockfd, POLLIN|POLLERR)

p.unregister(fd)
# 功能:取消对IO的关注
# 参数:IO对象或者IO对象的fileno

events = p.poll()
# 功能:阻塞等待监控的IO事件发生
# 返回值:
    # 返回发生的IO
    # events格式  [(fileno,event),()...]
    # 每个元组为一个就绪IO,元组第一项是该IO的fileno,第二项为该IO就绪的事件类型

poll_server步骤

  1. 创建套接字
  2. 将套接字register
  3. 创建查找字典并维护
  4. 循环监控IO发生
  5. 处理发生的IO
"""
poll_server.py  完成tcp并发服务
思路分析:
    IO多路复用实现并发
    建立fileno-->io对象字典用于IO查找
步骤
    创建套接字
    将套接字register
    创建查找字典并维护
    循环监控IO发生
    处理发生的IO
"""

from socket import *
from select import *

# 创建监听套接字,作为关注的IO
s = socket()
s.setsockopt(SOL_SOCKET, SO_REUSEADDR, True)
s.bind(('0.0.0.0', 8888))
s.listen(3)

# 创建poll对象
p = poll()

# 建立查找字典,通过一个IO的fileno找到IO对象
fd_map = {s.fileno(): s}

# 关注s
p.register(s, POLLIN | POLLERR)

# 循环监控IO发生
while True:
    events = p.poll()
    # 循环遍历列表,查看哪个IO就绪,进行处理
    for fd, event in events:
        # 区分哪个IO就绪
        if fd == s.fileno():
            c, addr = fd_map[fd].accept()
            print("Connect from", addr)
            # 关注客户端连接套接字
            p.register(c, POLLIN | POLLERR)
            fd_map[c.fileno()] = c  # 维护字典
        elif event & POLLIN:  # 判断POLLIN就绪
            data = fd_map[fd].recv(1024).decode()
            if not data:
                p.unregister(fd)  # 取消关注
                fd_map[fd].close()
                del fd_map[fd]
                continue
            print(data)
            fd_map[fd].send(b'OK')

1.4.4 epoll方法

  1. 使用方法
  • 基本与poll相同
  • 生成对象改为epoll()
  • 将所有事件类型改为EPOLL类型
  1. epoll特点
  • epoll效率比select poll要高
  • epoll监控IO数量比select要多
  • epoll的触发方式比poll要多(EPOLLET边缘触发)
  • epoll_server.py

1.5 协程技术

  1. 定义:纤程,微线程。是允许在不同入口点不同位置暂停或开始的计算机程序,简单来说,协程就是可以暂停执行的函数
  2. 协程原理:记录一个函数的上下文,协程调度切换时会将记录的上下文保存,在切换回来时进行调取,恢复原有的执行内容,以便从上一次执行位置继续执行
  3. 协程优缺点
    优点
     1. 协程完成多任务占用计算资源很少
     2. 由于协程的多任务切换在应用层完成,因此切换开销少
     3. 协程为单线程程序,无需进行共享资源同步互斥处理
    缺点:协程本质是一个单线程,无法利用计算机多核资源

扩展延伸@标准库协程的实现
  python3.5以后,使用标准库asyncio和async/await语法来编写并发代码。asyncio库通过对异步IO行为的支持完成python的协程。虽然官方说asyncio是未来的开发方向,但是由于其生态不够丰富,大量的客户端不支持awaitable需要自己去封装,所以在使用上存在缺陷。更多时候只能使用已有的异步库(asyncio等)功能有限

第三方协程模块

  1. greenlet模块
    安装:pip install greenlet
greenlet.greenlet(func)
# 功能:创建协程对象
# 参数:协程函数

g.switch()
# 功能:选择要执行的协程函数
  1. gevent模块
    安装pip install gevent
gevent.spawn(func,argv)
# 功能:生成协程对象
# 参数:
    # func 协程函数
    # argv 给协程函数传参(不定参)
# 返回值:协程对象

gevent.joinall(list,[timeout])
# 功能:阻塞等待协程执行完毕
# 参数:
    # list 协程对象列表
    # timeout 超时时间

gevent.sleep(sec)
# 功能:gevent睡眠阻塞
# 参数:睡眠时间
  • gevent协程只有在遇到gevent指定的阻塞行为时才会自动在协程之间跳转
    如gevent.joinall(),gevent.sleep()带来的阻塞

monkey脚本

  • 作用:在gevent协程中,协程只有遇到gevent指定类型的阻塞才能跳转到其他协程,
    因此,我们希望将普通的IO阻塞行为转换为可以触发gevent协程跳转的阻塞,以提
    高执行效率
  • 转换方法:gevent提供了一个脚本程序monkey,可以修改底层解释IO阻塞行为,将
    很多普通阻塞转换为gevent阻塞
  • 使用方法:
    1. 导入monkey
    from gevent import monkey
    2. 运行相应的脚本,例如转换socket中所有阻塞
    monkey.patch_socket()
    3. 如果将所有可转换的IO阻塞全部转换则运行all
    monkey.patch_all()
    4. 注意:脚本运行函数需要在对应模块导入前执行
举报

相关推荐

0 条评论