消息队列
队列:先进先出
堆栈:先进后出
from multiprocessing import Queue
q = Queue(5) # 自定义队列的长度
# 朝队列中存放数据
q.put(111)
q.put(222)
q.put(333)
print(q.full()) # False 判断队列是否满了
q.put(444)
q.put(555)
print(q.full()) # True
# q.put(666) # 超出最大长度 原地阻塞等待队列中出现空位
print(q.get())
print(q.get())
print(q.empty()) # False 判断队列是否空了
print(q.get())
print(q.get())
print(q.get())
print(q.empty()) # True
# print(q.get()) # 队列中没有值 继续获取则阻塞等待队列中给值
print(q.get_nowait()) # 队列中如果没有值 直接报错
"""
full()
empty()
get_nowait()
上述方法能否在并发的场景下精准使用???
不能用!!!
之所以介绍队列是因为它可以支持进程间数据通信
"""
IPC 机制(进程间通信)
1.主进程与子进程数据交互
2.两个子进程数据交互
本质:不同内存空间中的进程数据交互
生产者消费者模型
生产者与消费者是什么模型
生产者
负责生产/制作数据
消费者
负责消费/处理数据
生产者和消费者彼此之间不直接通讯,而是通过队列来进行通讯,生产者生产完的数据不用等待消费者处理,直接放到队列中,消费者不找消费要数据,而是直接从队列中取,队列就相当于一个缓冲区,平衡生产者和消费者的处理能力。
模型的三要素
1.生产者
2.消费者
3.队列(缓冲区)
通过代码来实现一下生产者消费者模型
from queue import Queue
import threading
import time
#创建队列
q=Queue(10)
# 定义一个生产者
def producer(name):
#生产计数
count=1
while True:
q.join()
q.put(count)
print("生产者%s正在生产第%d个产品"%(name,count))
count+=1
time.sleep(1)
#定义一个消费者
def customer(name):
count=1
while True:
hao_zi=q.get()
print("消费者%s正在消耗第%d个商品"%(name,hao_zi))
count+=1
q.task_done()
if __name__ == '__main__':
t1=threading.Thread(target=producer,args=("A", ))
t2=threading.Thread(target=customer,args=("a", ))
t1.start()
t2.start()
我们加上 time.sleep()这样可以清晰的看到生产者生产一个通过put()往队列中放一个,消费者消费一个通过get()取出一个 ,当当生产者生产完,消费者通过get()取空之后,就一直在原地等待。
from queue import Queue
import threading
import time
#创建队列
q=Queue(10)
# 定义一个生产者
def producer(name):
#生产计数
count=1
while True:
q.join()
q.put(count)
print("生产者%s正在生产第%d个产品"%(name,count))
count+=1
#定义一个消费者
def customer(name):
count=1
while True:
hao_zi=q.get()
print("消费者%s正在消耗第%d个商品"%(name,hao_zi))
count+=1
q.task_done()
time.sleep(1)
if __name__ == '__main__':
t1=threading.Thread(target=producer,args=("A", ))
t2=threading.Thread(target=customer,args=("a", ))
t1.start()
t2.start()
当生产者生产完通过put()放在队列中就等着消费者通过get()取出
生产者与消费者模型通过队列平衡生产力与消费力,就是生产者一直不停的生产,消费者可以不停的消费,生产者与消费者不直接沟通。
线程理论
什么是线程
开设线程的两种方式
第一种:
from threading import Thread
import time
def task(name):
print('%s is running' % name)
time.sleep(0.5)
print('%s is over' % name)
# 开启线程不需要在main下面执行,直接书写就可以
# 但我们还是习惯性的写在main的下面
t = Thread(target=task,args=('juson',))
t.start()
print('主') # 注意: 会发现运行的结果先运行了task里面的程序和进程不一样(先运行下面的主 因为子进程需要创建内存)
# 线程而是在进程内存里面开创的(所以创建线程开销非常小 几乎是代码一执行start线程就已经创建了)
第二种:
from threading import Thread
import time
class MyThread(Thread):
def __init__(self,name): # 重写了父类thread的方法又不知道别人的方法里有啥 你就调用父类的方法
super().__init__()
self.name = name
def run(self):
print('%s is running ' % self.name)
time.sleep(1)
print('%s DSB' % self.name)
if __name__ == '__main__':
t = MyThread('juson')
t.start()
print('主')
线程实现TCP服务端开发
import socket
from threading import Thread
from multiprocessing import Process
"""
服务端
1.要有固定的IP和PORT
2.24小时不间断提供服务
3.能够支持并发
"""
# 将服务的代码单独封装成一个函数
def talk(conn):
# 通信循环
while True:
try:
data = conn.recv(1024)
# 针对mac linux 客户端断开链接后
if len(data) == 0: break
print(data.decode('utf-8'))
conn.send(data.upper())
except ConnectionResetError as e:
print(e)
break
conn.close()
server =socket.socket() # 括号内不加参数默认就是TCP协议
server.bind(('127.0.0.1',8080))
server.listen(5)
# 链接循环
while True:
conn, addr = server.accept() # 接客
# 叫其他人来服务客户
# t = Thread(target=talk,args=(conn,))
t = Process(target=talk,args=(conn,))
t.start()
客户端:
import socket
client = socket.socket()
client.connect(('127.0.0.1',8080))
while True:
client.send(b'hello world')
data = client.recv(1024)
print(data.decode('utf-8'))
线程join方法
from threading import Thread
import time
def task(name):
print('%s is running'%name)
time.sleep(3)
print('%s is over'%name)
if __name__ == '__main__':
t = Thread(target=task,args=('jason',))
t.start()
t.join() # 主线程等待子线程运行结束再执行
print('主')
线程间数据共享
from threading import Thread
import time
money = 100
def task():
global money
money = 666
print(money)
if __name__ == '__main__':
t = Thread(target=task)
t.start()
t.join()
print(money)
线程对象属性和方法
守护线程
# from threading import Thread
# import time
#
#
# def task(name):
# print('%s is running'%name)
# time.sleep(1)
# print('%s is over'%name)
#
#
# if __name__ == '__main__':
# t = Thread(target=task,args=('json',))
# t.daemon = True
# t.start()
# print('主')
"""
主线程运行结束之后不会立刻结束 会等待所有其他非守护线程结束才会结束(也就是比主线程慢还在运行的子线程)
因为主线程的结束意味着所在的进程的结束
"""
GIL全局解释器锁
一、GIL 全局解释器锁
翻译结果:
重点:
二、为什么会有 GIL 锁?
Python 是一门解释型的语言,这就意味着代码是解释一行,运行一行,它并不清楚代码全局;
因此,每个线程在调用 cpython 解释器 在运行之前,需要先抢到 GIL 锁,然后才能运行。
编译型的语言就不会存在 GIL 锁,编译型的语言会直接编译所有代码,就不会出现这种问题。
三、GIL 锁与普通锁的区别
启动 10 个线程 去修改同一个变量–number
方案一:在线程运行的函数中加入 time.sleep(0.1),而且没有加入数据的互斥锁– 结果为 9 而不是 0,运行时间为 0.1 秒左右
import time
from threading import Thread,Lock
# mutex = Lock()
number = 10
def func():
global number
tem = number
time.sleep(0.1)
number = tem -1
if __name__ == '__main__':
thread_list = []
for i in range(10):
thread = Thread(target=func)
thread.start()
thread_list.append(thread)
for i in thread_list:
i.join()
print(number)
方案二:在线程运行的函数中 删除 time.sleep(0.1),而且没有加入数据的互斥锁– 结果为 0,运行时间为 0.1 秒左右
import time
from threading import Thread,Lock
# mutex = Lock()
number = 10
def func():
global number
tem = number
# time.sleep(0.1)
number = tem -1
if __name__ == '__main__':
thread_list = []
for i in range(10):
thread = Thread(target=func)
thread.start()
thread_list.append(thread)
for i in thread_list:
i.join()
print(number)
那么,为什么注释了一行代码,结果却有差距呢?
因为 time.sleep(0.1) 所代表的的操作是 IO 操作,当线程运行到这一行代码的时候,要进行 IO 操作,需要释放 CPU 资源,也就是说,线程这个时候需要等待 IO操作结束,此时线程就会处于"阻塞态"可以简单理解为这个线程什么活都不干了,就等着 IO结束,那么这个线程不干活了,其他线程要干活啊,所以,其他的线程就会获取 GIL 锁,再运行代码。
如果没有 time.sleep(0.1) 所代表的的操作是 IO 操作,线程就会一直运行到结束,才释放 GIL 锁,其他的线程才能获取 GIL 锁,运行代码;
那么,反映在这个例子中,第一个线程获取 GIL 锁以后,会直接运行到最后一步,修改了 number = tem -1=10-1=9;
下一个线程,再获取到的 number 就是 9 了,一次类推,结果为 0.
方案三:在方案一的基础上再加入数据的互斥锁– 结果为 0, 但是运行时间为 1 秒
import time
from threading import Thread,Lock
mutex = Lock()
number = 10
def func(mutex):
mutex.acquire()
global number
tem = number
time.sleep(0.1)
number = tem -1
mutex.release()
if __name__ == '__main__':
thread_list = []
for i in range(10):
thread = Thread(target=func,args=(mutex,))
thread.start()
thread_list.append(thread)
for i in thread_list:
i.join()
print(number)
运行结果的区别在于,第一个线程在运行时,先获取 GIL 锁,然后对 number数据进行上锁,运行到 time.sleep(0.1) 线程变成"阻塞态",释放 GIL 锁;
第二个进程获取 GIL 锁,但是由于 number数据已经被上锁了,无法操作,只能变成"阻塞态",释放 GIL 锁;
其他线程也是一样,当第一个线程 time.sleep(0.1) 运行结束之后(IO 操作结束) ,第一个线程会变成"就绪态",那么就可以获得 GIL 锁,继续运行,直到最终修好 number ,释放掉互斥锁,线程运行结束,然后释放 GIL 锁;
也就是说,当第一个进程运行 time.sleep(0.1) 的时候,其他的线程什么都做不了,只能干等;
当 数据的互斥锁被释放之后,其他的线程就可以获取互斥锁,那么其他的线程都会变成"就绪态",但是只有一个线程能够获取 (服从 CPU 的调度算法,这里不进行拓展了),能够获取互斥锁的线程会对 GIL 锁,互斥锁进行上锁,再运行… …
所以最终的结果是 0,但是运行时间是 每个线程的运行时间相加。
3.1 为什么要再加上数据锁?
四、多线程无法利用多核优势?
在 4核及以上 的情况下:
多进程:进程进行切换需要消耗大量资源
多线程:线程进行切换并不需要消耗大量资源