0
点赞
收藏
分享

微信扫一扫

python中的线程


概念:

线程被称作轻量级进程。

与进程类似,不过它们是在同一个进程下执行的。

并且它们会共享相同的上下文。

当其他线程运行时,它可以被抢占(中断)

和临时挂起(也成为睡眠) — 让步

线程的轮训调度机制类似于进程的轮询调度。

只不过这个调度不是由操作系统来负责,而是由Python解释器来负责。

线程的创建与使用:

import threading
from threading import Thread
def func(a):
print(a)
t=threading.Thread(target=func,args=(1,),name="线程1")
t.start()
t.join()
print(t.is_alive()) #true是线程正在运行,false已经结束
print(t.getName())
print(t.daemon)#判断是否为守护进程
print(t.ident)
print(t.isDaemon())#判断是否为守护进程
t.setName("线程2")
print(t.name)

解释说明:


  • 导入模板​​threading,import threading​​ 或者 ​​from threading import Thread​
    ​- threading.Thread(target=func,args=(1,))​​ 第一个参数是要调度的函数,第二个参数是要传的参数,元祖,当然此方法中可以传的参数不止这些,还可以传线程的名字name,一个kwargs的字典,还有group(这个不知道是什么)
  • ​t.start()​​ 启动线程

使用多线程实现并发服务器

代码示例:

import socket
import threading
import sys
import os
def readable(conn,addr):
sys.stdin=os.fdopen(0)
while True:
data=conn.recv(1024)
if data:
print("客户端{}发来消息:{}".format(addr,data))#默认子线程会关闭input
msg=input(">>")
conn.send(msg.encode())
else:
break
conn.close()
if __name__ == '__main__':
server = socket.socket()
server.bind(("", 4321))
server.listen(5)
print("开始监听")
while True:
con,addr=server.accept()
print("有一个线程:客户端{}已连接".format(addr))
t=threading.Thread(target=readable,args=(con,addr))
t.start()

线程间的通信(消息队列)

因为线程属于同一个进程,因此它们之间共享内存区域。因此全局变量是公共的。所以线程间的通信没有进程那么麻烦。

线程与队列(专门用于线程间安全通信的队列)

线程中使用队列和进程中使用队列很相似。

总结一句就是

线程使用普通的queue,进程使用​​Manager().Queue()​

代码示意:

import queue
q=queue.Queue(2)
q.put(1)
q.put(2)
print(q.full())#True
print(q.qsize())
num=q.get()
#q.task_done()

num=q.get()

#num=q.get()#因为没有数据了,所以阻塞了
#num=q.get(block=False)# raise Empty queue.Empty
#num=q.get(timeout=2)# raise Empty queue.Empty
print("num",num)

print(q.empty())#True

线程queue的函数与进程queue基本相同,不再做赘述。

消费者与生产者模式

import queue
import threading
import random

class Producter(threading.Thread):
def __init__(self,queue):
super().__init__()
self.queue=queue
def run(self):
while True:
r = random.randint(0,9)
if not self.queue.full():
self.queue.put(r)
print('往队列里添加了一个数据{}'.format(r))
else:
print('满了')

class Consumer(threading.Thread):
def __init__(self,queue):
super().__init__()
self.queue=queue
def run(self):
while True:
data = self.queue.get()
print("从队列中取出数据{}".format(data))
if __name__ == '__main__':
q = queue.Queue(3)
p1 = Producter(q)
c1 = Consumer(q)
p1.start()
c1.start()
# p1.join()
# c1.join()
# q.join()

线程锁

线程为什么需要锁?

共享内存间存在竞争问题,使用锁来控制共享资源的访问

from threading import Thread,Lock

lock = Lock()

def incr():
lock.acquire()#加锁
print("线程1")
lock.release()#释放锁
# with lock:#这个相当于上面两行代码
# print("线程1")
def decr():
lock.acquire()
print("线程2")
lock.release()
#with lock:
# print("线程2")

t_incr = Thread(target=incr)
t_decr = Thread(target=decr)
t_incr.start()
t_decr.start()

线程有两种锁一种Lock,一种RLock。了解不深,日后补充。

线程池

python自带线程池是在multiprocessing.pool模块里的ThreadPool,使用方式和进程池几乎一样。

代码示意:

#线程池
from multiprocessing.pool import ThreadPool
import time
pool=ThreadPool(4)
msg_list=[1,2,3,4]
def task():
time.sleep(2)
print("...")
def work(msg):
time.sleep(2)
print(msg)
t1=time.time()
pool.map(work,msg_list) #map()第一个参数是一个函数,第二个参数是一个迭代器,迭代器的元素会逐个放到函数里
#pool.apply_async(task)
#pool.map_async(work,msg_list)
#pool.apply_async(work,args=(msg_list,))#第一个是函数,第二个是参数,参数直接传入函数中,用了async,主线程就不会等待子线程完成,所以要有如下操作
pool.close()
pool.join()
print(time.time()-t1)

'''
pool.map()
pool.map_async()
pool.apply()
pool.apply_async()
'''

线程池实现并发服务器

import socket
from multiprocessing.pool import ThreadPool

server=socket.socket()
server.bind(('',6666))
server.listen(5)
print("服务端等待连接")

def readable(conn,addr):
print("客户端{}建立连接".format(addr))
while True:
data=conn.recv(1024)
print(data)
if data:
conn.send(data)
if __name__ == '__main__':
pool = ThreadPool(2)
while True:
conn,addr = server.accept()
pool.apply_async(readable,args=(conn,addr))
pool.close()
pool.join()

自己实现线程池的逻辑:

import threading
import queue
import time

class Mythread(threading.Thread):
def __init__(self,queue):
super().__init__()
self.queue=queue
self.setDaemon(True)#设为守护线程,主线程结束子线程跟着结束
self.start()
def run(self):
while True:
func,args,kwargs = self.queue.get()
func(*args,**kwargs)
self.queue.task_done() #告诉线程,队列获取已经结束,不然,get不到东西会一直阻塞
def join(self, timeout=None):
self.queue.join()


class MyThreadPool():
def __init__(self,num):
self.num = num
self.queue = queue.Queue()
for _ in range(self.num):
Mythread(self.queue)
def apply_async(self,func,args=(),kwargs={}):
self.queue.put((func,args,kwargs))
def join(self):
self.queue.join()
def func1():
print(threading.current_thread().getName())
time.sleep(2)
if __name__ == '__main__':
t1=time.time()
pool=MyThreadPool(2)
for i in range(4):
pool.apply_async(func1)
pool.join()
print(time.time()-t1)



举报

相关推荐

0 条评论