概念:
程序:计算机程序是存储在磁盘上的可执行二进制(或其他类型)文件。
只有把它们加载到内存中,并被操作系统调用,它们才会拥有其自己的生命周期。
进程:则是表示的一个正在执行的程序。
每个进程都拥有自己的地址空间、内存、数据栈,以及其他用于跟踪执行的辅助数据。
操作系统负责其上所有进程的执行。
操作系统会为这些进程合理地分配执行时间。
并发与并行:
个人理解:
并发是一起发送,但不一定一起执行
并行是一起执行
进程的创建:
import multiprocessing
def func(a):
pass
mul=multiprocessing.Process(target=func,args=(a,))
mul.start()
解释说明:
- Process()中的参数,第一个是要调度的函数名,第二个是要穿的参数,是元组,当值此方法中可以传的参数不止这些,还可以传进程的名字name,一个kwargs的字典,还有group(这个不知道是什么)
- mul.start() 进程启动
- 导入此标准库的方式
import multiprocessing
,第二种 from multiprocessing import Process
导入指定模板。
进程在linux和windows的使用不同
linux 中的进程os.fork()
import os
pid=os.fork()
print(pid)
print("---------------")
print("子进程{}".format(os.getpid()))
print("父进程{}".format(os.getppid()))
打印数据:
2937
---------------
子进程2936
父进程2935
0
---------------
子进程2937
父进程2936
为什么会打印两次呢?
os.fork()函数创建进程的时候先复制原有进程(父进程)到子进程,在父进程里返回值是创建的子进程的pid,在子进程里返回值是0,可以通过os.getpid()函数获取当前进程的pid,通过os.getppid()获取父进程的pid。
注:fork()不能用于windows系统
python是一个跨平台的语言,当然要封装出一个windows能用的进程。
multiprocessing就是根据fork封装的一个通用进程模板。
经后使用进程我们也用multiprocessing,而不是用原生的fork。
windows中的进程multiprocessing
在windows中使用进程必须在
if __name__ == '__main__':
下面调用,因为在导入multiprocessing模板的py文件里它同样会把if __name__ == '__main__':
上面的代码调用两次。
import multiprocessing
print("---------------------")
print(multiprocessing.current_process())
print(multiprocessing.active_children())
def func():
print("哈哈")
print("123")
if __name__ == '__main__':
p=multiprocessing.Process(target=func,name="进程一")# 这是为子进程命名
p.start()
print("关闭进程")
print(p.is_alive())
print("当前进程:",multiprocessing.current_process())
print("子进程:",multiprocessing.active_children())
打印输出:
---------------------
<_MainProcess(MainProcess, started)>
[]
123
关闭进程
True
当前进程: <_MainProcess(MainProcess, started)>
子进程: [<Process(进程一, started)>]
---------------------
<_MainProcess(进程一, started)>
[]
123
哈哈
运行两次,第一次是从头到尾一遍,第二次是从头到if上面一遍后执行子进程调用的函数
父进程把相同的一份复制给子进程
从两次打印的数据也能看出第一次执行的是主进程,第二次执行的是子进程。
进程join
import multiprocessing
import time
print('mainprocess start:{}'.format(time.asctime(time.localtime())))
def func():
print('subprocess start:{}'.format(time.asctime(time.localtime())))
time.sleep(3)
print('subprocess end:{}'.format(time.asctime(time.localtime())))
if __name__ == '__main__':
p=multiprocessing.Process(target=func)
p.start()
#p.join()# 等待子进程结束
print('mainprocess end:{}'.format(time.asctime(time.localtime())))
#进程由操作系统调用,线程由python解释器调用
打印:
mainprocess start:Fri Apr 13 16:37:58 2018
mainprocess end:Fri Apr 13 16:37:58 2018
subprocess start:Fri Apr 13 16:37:58 2018
subprocess end:Fri Apr 13 16:38:01 2018
可以看出主进程和子进程同时开始,主进程立马结束,子进程过3秒结束
如果有了p.join()
打印:
mainprocess start:Fri Apr 13 16:41:05 2018
subprocess start:Fri Apr 13 16:41:05 2018
subprocess end:Fri Apr 13 16:41:08 2018
mainprocess end:Fri Apr 13 16:41:08 2018
可以看出子进程结束了,主进程才结束。
守护进程daemon:
p.daemon=True
如果设置了守护进程,主进程结束,子进程不会执行
以面向对象的方式使用进程
from multiprocessing import Process
import time
class myProcess(Process):
def __init__(self):
super().__init__()
def run(self):
time.sleep(2)
print("进程执行")
if __name__ == '__main__':
p=myProcess()
p.start()
进程实现socket
import socket
import multiprocessing
import sys
import os
def readable(conn,addr):
sys.stdin=os.fdopen(0)
while True:
data=conn.recv(1024)
if data:
print("客户端{}发来消息:{}".format(addr,data))#默认子进程会关闭input
msg=input(">>")
conn.send(msg.encode())
else:
break
conn.close()
if __name__ == '__main__':
server = socket.socket()
server.bind(("", 4321))
server.listen(5)
print("开始监听")
while True:
con,addr=server.accept()
mul=multiprocessing.Process(target=readable,args=(con,addr))
mul.start()
进程间的通信(资源共享)
每个进程都拥有自己独立的地址空间、内存、数据栈,所以进程与进程之间信息就不能互通。为了解决这一问题,
multiprocessing封装了一个模块Manager。
进程间通信的解决方案示意图
代码示例:
from multiprocessing import Process,Manager
manager=Manager()#启动服务器进程
a=manager.dict()
def func(a):
a["b"]="aas"
print(a)
p=Process(target=func,args=(a,))
p.start()
p.join()
print(a)
一般常用的空间类型是:
1. mgr.list()
2. mgr.dict()
3. mgr.Queue()
进程间的通信,也可以通过队列来实现。
队列算公共资源吗?
如果只是一个线程/进程在使用,那么它并不算公共资源。但是一旦多个线程/进程在同时使用,那么它就是一个公共资源。
我们是否需要对其加锁?
如果被当作公共资源使用,那么按理说是必须要加锁的。但是,线程安全或进程安全的队列中已经帮我们实现了锁。因此我们不需要再自己使用锁来同步。
队列在python里普通的模板queue,进程也自己封装了一个在Manager里。普通的队列是不能用于进程通信的。
代码示例:
from multiprocessing import Process,Manager
if __name__ == '__main__':
q = Manager().Queue(3)
q.put(1)#入队
print("队列是否满{}".format(q.full()))
print("队列的现有元素个数{}".format(q.qsize()))
print("队列是否为空{}".format(q.empty()))
q.task_done()#任务结束
q.join()# 等待完成(因为前面调用了task_done,表示任务已经完成,所以join这里不会再等待完成)
a=q.get()#出队
print(a)
a = q.get()#get会阻塞,如果队列中没数据了,就会阻塞
print(a)
#打印数据
队列是否满False
队列的现有元素个数1
队列是否为空False
1
进程中使用队列:
from multiprocessing import Process,Manager
class MyProcess(Process):
def __init__(self):
super().__init__()
self.queue=Manager().Queue()
def run(self):
print("进程执行")
self.queue.put(1)
if __name__ == '__main__':
p=MyProcess()
p.start()
#p.join() #不加p.join() 会报错
#报错提示:
进程执行
Process MyProcess-1:
Traceback (most recent call last):
File "F:\anaconda\Anaconda3\lib\multiprocessing\managers.py", line 749, in _callmethod
conn = self._tls.connection
AttributeError: 'ForkAwareLocal' object has no attribute 'connection'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "F:\anaconda\Anaconda3\lib\multiprocessing\process.py", line 249, in _bootstrap
self.run()
File "D:\Apython2017.10.28\concurrent\线程与队列\demo3.py", line 10, in run
self.queue.put(1)
File "<string>", line 2, in put
File "F:\anaconda\Anaconda3\lib\multiprocessing\managers.py", line 753, in _callmethod
self._connect()
File "F:\anaconda\Anaconda3\lib\multiprocessing\managers.py", line 740, in _connect
conn = self._Client(self._token.address, authkey=self._authkey)
File "F:\anaconda\Anaconda3\lib\multiprocessing\connection.py", line 485, in Client
c = PipeClient(address)
File "F:\anaconda\Anaconda3\lib\multiprocessing\connection.py", line 686, in PipeClient
_winapi.WaitNamedPipe(address, 1000)
FileNotFoundError: [WinError 2] 系统找不到指定的文件。
原因:
如果有子进程在使用Manage()对象时,在父进程不能使用这个对象,所以要等所有子进程结束即需使用p.join()后方可在父进程使用Manage()的对象。
消费者与生产者模式:
所谓,生产者与消费者模型,其实是把一个需要进程通信的问题
分开考虑。
生产者,只需要往队列里面丢东西(生产者不需要关心消费者)
消费者,只需要从队列里面拿东西(消费者也不需要关心生产者)
代码示例1(自定义进程):
from multiprocessing import Process,Manager
import random
class Producter(Process):
def __init__(self,queue):
super().__init__()
self.queue=queue
def run(self):
while True:
r = random.randint(0,9)
if not self.queue.full():
self.queue.put(r)
print('往队列里添加了一个数据{}'.format(r))
self.queue.task_done()
else:
print('满了')
class Consumer(Process):
def __init__(self,queue):
super().__init__()
self.queue=queue
def run(self):
while True:
data = self.queue.get()
print("从队列中取出数据{}".format(data))
if __name__ == '__main__':
q = Manager().Queue(3)
p1 = Producter(q)
c1 = Consumer(q)
p1.start()
c1.start()
p1.join()
#打印数据:
往队列里添加了一个数据1
往队列里添加了一个数据0
往队列里添加了一个数据1
满了
满了
满了
满了
满了
...
从队列中取出数据8
从队列中取出数据3
往队列里添加了一个数据7
从队列中取出数据9
从队列中取出数据7
往队列里添加了一个数据3
从队列中取出数据3
往队列里添加了一个数据1
从队列中取出数据1
...
代码示例2:
from multiprocessing import Process,Manager
def putfunc(queue):
print("添加数据到队列")
queue.put(1)
def getfunc(queue):
a=queue.get()
print("取出数据{}".format(a))
if __name__ == '__main__':
queue=Manager().Queue()
p1=Process(target=putfunc,args=(queue,))
p2=Process(target=getfunc,args=(queue,))
p1.start()
p2.start()
p1.join()
#打印数据:
添加数据到队列
取出数据1
多进程锁multiprocess.Lock
因为进程之间资源不共享,所以普通进程不需要锁,只有在多进程读写同一个文件时才需要锁(个人理解)
关于锁的使用,进程和线程的锁很相似(因为对进程锁了解不是很深,所以在此不做赘述,日后用到再补充。)
进程池
进程池的概念
python中,进程池内部会维护一个进程序列。当需要时,程序会去进程池中获取一个进程。
如果进程池序列中没有可供使用的进程,那么程序就会等待,直到进程池中有可用进程为止。
进程池的内置方法
apply 从进程池里取一个进程并同步执行
apply_async 从进程池里取出一个进程并异步执行
map 同apply,只是传参不一样
map_async
terminate 立刻关闭进程池
join 主进程等待所有子进程执行完毕,必须在close或terminete之后
close 等待所有进程结束才关闭线程池
代码示例:
#进程池
from multiprocessing import Pool
import time
#进程池和线程池几乎一样的接口
msg_list=[1,2]
def task():
time.sleep(2)
print("...")
def work(msg):
time.sleep(2)
print(msg)
t1=time.time()
if __name__ == '__main__':
pool = Pool(2)
pool.map(work,msg_list) #map()第一个参数是一个函数,第二个参数是一个迭代器,迭代器的元素会逐个放到函数里
print("map耗时{}".format(time.time()-t1))
#pool.apply(task) # apply基本用不上因为它和单进程没区别
#pool.map_async(work,msg_list)
t1 = time.time()
pool.apply_async(work,args=(msg_list,))#第一个是函数,第二个是参数,参数直接传入函数中,用了async,主线程就不会等待子线程完成,所以要有如下操作
pool.apply_async(work, args=(msg_list,))
pool.close()#异步,必须要有close和join,不然进程会跟着主进程结束
pool.join()
print(time.time()-t1)
print("apply_async耗时{}".format(time.time() - t1))
#打印:
1
2
map耗时2.030236005783081
[1, 2]
[1, 2]
2.0051233768463135
apply_async耗时2.005246162414551
使用进程池实现并发服务器
import socket
from multiprocessing import Pool
server=socket.socket()
server.bind(('',6666))
server.listen(5)
print("服务端等待连接")
def readable(conn,addr):
print("客户端{}建立连接".format(addr))
while True:
data=conn.recv(1024)
print(data)
if data:
conn.send(data)
if __name__ == '__main__':
pool = Pool(2)
while True:
conn,addr = server.accept()
pool.apply_async(readable,args=(conn,addr))
#pool.close()
#pool.join()