0
点赞
收藏
分享

微信扫一扫

【python线程与GIL 锁】

老牛走世界 2022-04-21 阅读 85

消息队列

队列:先进先出

堆栈:先进后出

from multiprocessing import Queue


q = Queue(5)  # 自定义队列的长度
# 朝队列中存放数据
q.put(111)
q.put(222)
q.put(333)
print(q.full())  # False  判断队列是否满了
q.put(444)
q.put(555)
print(q.full())  # True
# q.put(666)  # 超出最大长度 原地阻塞等待队列中出现空位
print(q.get())
print(q.get())
print(q.empty())  # False  判断队列是否空了
print(q.get())
print(q.get())
print(q.get())
print(q.empty())  # True
# print(q.get())  # 队列中没有值 继续获取则阻塞等待队列中给值
print(q.get_nowait())  # 队列中如果没有值 直接报错
"""
    full()
    empty()
    get_nowait()
上述方法能否在并发的场景下精准使用???
    不能用!!!
    
之所以介绍队列是因为它可以支持进程间数据通信
"""

IPC 机制(进程间通信)

1.主进程与子进程数据交互
2.两个子进程数据交互
本质:不同内存空间中的进程数据交互
 


生产者消费者模型

生产者与消费者是什么模型

生产者
负责生产/制作数据

消费者

负责消费/处理数据

生产者和消费者彼此之间不直接通讯,而是通过队列来进行通讯,生产者生产完的数据不用等待消费者处理,直接放到队列中,消费者不找消费要数据,而是直接从队列中取,队列就相当于一个缓冲区,平衡生产者和消费者的处理能力。

img

模型的三要素

1.生产者
2.消费者
3.队列(缓冲区)

通过代码来实现一下生产者消费者模型

from queue import Queue
import threading
import time
#创建队列
q=Queue(10)
# 定义一个生产者
def producer(name):
    #生产计数
    count=1
    while True:
        q.join()
        q.put(count)
        print("生产者%s正在生产第%d个产品"%(name,count))
        count+=1
        time.sleep(1)

#定义一个消费者
def customer(name):
    count=1
    while True:
        hao_zi=q.get()
        print("消费者%s正在消耗第%d个商品"%(name,hao_zi))
        count+=1
        q.task_done()

if __name__ == '__main__':
    t1=threading.Thread(target=producer,args=("A", ))
    t2=threading.Thread(target=customer,args=("a", ))
    t1.start()
    t2.start()

我们加上 time.sleep()这样可以清晰的看到生产者生产一个通过put()往队列中放一个,消费者消费一个通过get()取出一个 ,当当生产者生产完,消费者通过get()取空之后,就一直在原地等待。

from queue import Queue
import threading
import time
#创建队列
q=Queue(10)
# 定义一个生产者
def producer(name):
    #生产计数
    count=1
    while True:
        q.join()
        q.put(count)
        print("生产者%s正在生产第%d个产品"%(name,count))
        count+=1

#定义一个消费者
def customer(name):
    count=1
    while True:
        hao_zi=q.get()
        print("消费者%s正在消耗第%d个商品"%(name,hao_zi))
        count+=1
        q.task_done()
        time.sleep(1)


if __name__ == '__main__':
    t1=threading.Thread(target=producer,args=("A", ))
    t2=threading.Thread(target=customer,args=("a", ))
    t1.start()
    t2.start()

当生产者生产完通过put()放在队列中就等着消费者通过get()取出

生产者与消费者模型通过队列平衡生产力与消费力,就是生产者一直不停的生产,消费者可以不停的消费,生产者与消费者不直接沟通。

线程理论

什么是线程

开设线程的两种方式

第一种:

from threading import Thread
import time

def task(name):
    print('%s is running' % name)
    time.sleep(0.5)
    print('%s is over' % name)

# 开启线程不需要在main下面执行,直接书写就可以
# 但我们还是习惯性的写在main的下面

t = Thread(target=task,args=('juson',))
t.start()

print('主')  # 注意: 会发现运行的结果先运行了task里面的程序和进程不一样(先运行下面的主 因为子进程需要创建内存)
                     # 线程而是在进程内存里面开创的(所以创建线程开销非常小 几乎是代码一执行start线程就已经创建了)

第二种:

from threading import Thread
import time

class MyThread(Thread):

    def __init__(self,name): # 重写了父类thread的方法又不知道别人的方法里有啥 你就调用父类的方法
        super().__init__()
        self.name = name


    def run(self):
        print('%s is running ' % self.name)
        time.sleep(1)
        print('%s DSB' % self.name)

if __name__ == '__main__':
    t = MyThread('juson')
    t.start()
    print('主')

线程实现TCP服务端开发

import socket
from threading import Thread
from multiprocessing import Process
"""
服务端
    1.要有固定的IP和PORT
    2.24小时不间断提供服务
    3.能够支持并发
  
"""

# 将服务的代码单独封装成一个函数
def talk(conn):
    # 通信循环
    while True:
        try:
            data = conn.recv(1024)
            # 针对mac linux 客户端断开链接后
            if len(data) == 0: break
            print(data.decode('utf-8'))
            conn.send(data.upper())
        except ConnectionResetError as e:
            print(e)
            break
    conn.close()

server =socket.socket()  # 括号内不加参数默认就是TCP协议
server.bind(('127.0.0.1',8080))
server.listen(5)


# 链接循环
while True:
    conn, addr = server.accept()  # 接客
    # 叫其他人来服务客户
    # t = Thread(target=talk,args=(conn,))
    t = Process(target=talk,args=(conn,))
    t.start()

客户端:

import socket


client = socket.socket()
client.connect(('127.0.0.1',8080))

while True:
    client.send(b'hello world')
    data = client.recv(1024)
    print(data.decode('utf-8'))

线程join方法

from threading import Thread
import time


def task(name):
    print('%s is running'%name)
    time.sleep(3)
    print('%s is over'%name)


if __name__ == '__main__':
    t = Thread(target=task,args=('jason',))
    t.start()
    t.join()  # 主线程等待子线程运行结束再执行
    print('主')

线程间数据共享

from threading import Thread
import time


money = 100


def task():
    global money
    money = 666
    print(money)


if __name__ == '__main__':
    t = Thread(target=task)
    t.start()
    t.join()
    print(money)

线程对象属性和方法

守护线程

# from threading import Thread
# import time
#
#
# def task(name):
#     print('%s is running'%name)
#     time.sleep(1)
#     print('%s is over'%name)
#
#
# if __name__ == '__main__':
#     t = Thread(target=task,args=('json',))
#     t.daemon = True
#     t.start()
#     print('主')

"""
主线程运行结束之后不会立刻结束 会等待所有其他非守护线程结束才会结束(也就是比主线程慢还在运行的子线程)
    因为主线程的结束意味着所在的进程的结束
"""

GIL全局解释器锁

一、GIL 全局解释器锁

翻译结果:

重点:

二、为什么会有 GIL 锁?
Python 是一门解释型的语言,这就意味着代码是解释一行,运行一行,它并不清楚代码全局;

因此,每个线程在调用 cpython 解释器 在运行之前,需要先抢到 GIL 锁,然后才能运行。

编译型的语言就不会存在 GIL 锁,编译型的语言会直接编译所有代码,就不会出现这种问题。

三、GIL 锁与普通锁的区别

启动 10 个线程 去修改同一个变量–number

方案一:在线程运行的函数中加入 time.sleep(0.1),而且没有加入数据的互斥锁– 结果为 9 而不是 0,运行时间为 0.1 秒左右

import time
from threading import Thread,Lock


# mutex = Lock()
number = 10

def func():
    global number
    tem = number
    time.sleep(0.1)  
    number = tem -1

if __name__ == '__main__':

    thread_list = []
    for i in range(10):
        thread = Thread(target=func)
        thread.start()
        thread_list.append(thread)
    for i in thread_list:
        i.join()

    print(number)

方案二:在线程运行的函数中 删除 time.sleep(0.1),而且没有加入数据的互斥锁– 结果为 0,运行时间为 0.1 秒左右

import time
from threading import Thread,Lock


# mutex = Lock()
number = 10

def func():
    global number
    tem = number
    # time.sleep(0.1)  
    number = tem -1

if __name__ == '__main__':

    thread_list = []
    for i in range(10):
        thread = Thread(target=func)
        thread.start()
        thread_list.append(thread)
    for i in thread_list:
        i.join()

    print(number)

那么,为什么注释了一行代码,结果却有差距呢?

因为 time.sleep(0.1) 所代表的的操作是 IO 操作,当线程运行到这一行代码的时候,要进行 IO 操作,需要释放 CPU 资源,也就是说,线程这个时候需要等待 IO操作结束,此时线程就会处于"阻塞态"可以简单理解为这个线程什么活都不干了,就等着 IO结束,那么这个线程不干活了,其他线程要干活啊,所以,其他的线程就会获取 GIL 锁,再运行代码。

如果没有 time.sleep(0.1) 所代表的的操作是 IO 操作,线程就会一直运行到结束,才释放 GIL 锁,其他的线程才能获取 GIL 锁,运行代码;

那么,反映在这个例子中,第一个线程获取 GIL 锁以后,会直接运行到最后一步,修改了 number = tem -1=10-1=9;

下一个线程,再获取到的 number 就是 9 了,一次类推,结果为 0.

方案三:在方案一的基础上再加入数据的互斥锁– 结果为 0, 但是运行时间为 1 秒

import time
from threading import Thread,Lock


mutex = Lock()
number = 10

def func(mutex):
    mutex.acquire()
    global number
    tem = number
    time.sleep(0.1)
    number = tem -1
    mutex.release()

if __name__ == '__main__':

    thread_list = []
    for i in range(10):
        thread = Thread(target=func,args=(mutex,))
        thread.start()
        thread_list.append(thread)
    for i in thread_list:
        i.join()

    print(number)

运行结果的区别在于,第一个线程在运行时,先获取 GIL 锁,然后对 number数据进行上锁,运行到 time.sleep(0.1) 线程变成"阻塞态",释放 GIL 锁;

第二个进程获取 GIL 锁,但是由于 number数据已经被上锁了,无法操作,只能变成"阻塞态",释放 GIL 锁;

其他线程也是一样,当第一个线程 time.sleep(0.1) 运行结束之后(IO 操作结束) ,第一个线程会变成"就绪态",那么就可以获得 GIL 锁,继续运行,直到最终修好 number ,释放掉互斥锁,线程运行结束,然后释放 GIL 锁;

也就是说,当第一个进程运行 time.sleep(0.1) 的时候,其他的线程什么都做不了,只能干等;

当 数据的互斥锁被释放之后,其他的线程就可以获取互斥锁,那么其他的线程都会变成"就绪态",但是只有一个线程能够获取 (服从 CPU 的调度算法,这里不进行拓展了),能够获取互斥锁的线程会对 GIL 锁,互斥锁进行上锁,再运行… …

所以最终的结果是 0,但是运行时间是 每个线程的运行时间相加。
3.1 为什么要再加上数据锁?

四、多线程无法利用多核优势?

在 4核及以上 的情况下:

多进程:进程进行切换需要消耗大量资源

多线程:线程进行切换并不需要消耗大量资源

举报

相关推荐

0 条评论