多进程和多线程
多任务的3种方式:多进程模式、多线程模式、多进程+多线程模式。
多进程
Unix/Linux操作系统提供了一个fork()
系统调用,普通函数调用一次返回一次,但是fork()
调用一次,返回两次,因为系统把当前进程(父进程)复制了一份(子进程),分别在父进程和子进程返回。
import os
from multiprocessing import Process
print('process:%s' % os.getpid())#当前进程
pid = os.fork()#父进程pid==47362 then 子进程pid==0
if pid == 0:
print('child process %s my parent is %s' %(os.getpid(),os.getppid()))
else:
print('%s just create a child process %s' %(os.getpid(),pid))
output:
process:47362
47362 just create a child process 47369
child process 47369 my parent is 47362
而Windows没有fork()调用,python通过multiprocessing支持跨平台的多进程。
def run_proc(name):
print('run child process %s %s' %(name,os.getpid()))
print('parent process %s' % os.getpid())
p = Process(target=run_proc,args=('test',))#目标函数 函数变量
print('child process will start')
p.start()
#join()可以等待子进程结束后再继续执行,用于进程间的同步
p.join()
print('child process end')
output:
Parent process 928.
Process will start.
Run child process test (929)...
Process end.
如果要启动大量的子进程,可以用进程池的方法批量创建子进程
#进程池批量创建子进程
def long_time_task(name):
print('run task %s %s...'% (name,os.getpid()))
start = time.time()
time.sleep(random.random()*3)
end = time.time()
print('task %s run %0.2f seconds' %(name,(end-start)))
print('parent process %s' % (os.getpid()))
p = Pool(4)
for i in range(5):
p.apply_async(long_time_task,args=(i,))
print('waiting for all subprocesses done')
p.close()
p.join()
print('all subprocess done')
output:
parent process 47362
run task 1 47704...
run task 0 47703...
run task 2 47705...
run task 3 47706...
waiting for all subprocesses done
task 3 run 0.62 seconds
run task 4 47706...
task 2 run 0.83 seconds
task 0 run 1.56 seconds
task 1 run 1.59 seconds
task 4 run 2.98 seconds
all subprocess done
对Pool对象调用join()
方法会等待所有子进程执行完毕,调用join()
前必须调用close()
,调用close()
后就不能再添加新的Process。注意这里我们的pool设定的最大限制为4,最多同时执行4个进程,所以task4要等待前面某个task完成后才执行。Pool的默认大小是CPU的核数。
子进程
下述代码相当于在命令行执行nslookup
,然后手动输入
set q=mx
python.org
exit
#进程输入
print('$ nslookup')
p = subprocess.Popen(['nslookup'],stdin=subprocess.PIPE,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
output, err = p.communicate(b'set q=mx\npython.org\nexit\n')
print(output.decode('utf-8'))
print('eixt code:',p.returncode)
进程间通信
#进程间通信,建立一个队列Queue
def write(q):
print('process to wtite: %s' % os.getpid())
for value in ['A','B','C']:
print('put %s to queue...' % value)
q.put(value)
time.sleep(random.random())
def read(q):
print('process to read: %s' % os.getpid())
while True:
value = q.get(True)
print('get %s from queue' % value)
q = Queue()#父进程创建Queue,并传给子进程
pw = Process(target=write,args=(q,))
pr = Process(target=read,args=(q,))
pw.start()#启动子进程pw,写入
pr.start()#启动子进程pr,读取
pw.join()
pr.terminate()#pr一直循环,这里需要强行终止
output:
process to wtite: 47937
process to read: 47938
put A to queue...
get A from queue
put B to queue...
get B from queue
put C to queue...
get C from queue
多线程
python支持多线程有两个模块_thread和threading
启动一个线程就是把一个函数传入并创建Thread
实例,然后调用start()
def loop():
print('thread %s is running...' % threading.current_thread().name)
n = 0
while n<5:
n = n+1
print('thread %s >>> %s' %(threading.current_thread().name,n))
time.sleep(1)
print('thread %s ended' % threading.current_thread().name)
print('thread %s is running...' % threading.current_thread().name)
t = threading.Thread(target=loop,name='LoopThread')
t.start()
t.join()
print('thread %s ended.' % threading.current_thread().name)
output:
thread MainThread is running...
thread LoopThread is running...
thread LoopThread >>> 1
thread LoopThread >>> 2
thread LoopThread >>> 3
thread LoopThread >>> 4
thread LoopThread >>> 5
thread LoopThread endedthread MainThread ended.#这里没有换行没来得及打印换行符?
任何进程默认启动一个线程–主线程,主线程又可以启动新的线程,python的threading
模块有个current_thread()
,返回当前线程的实例。
Lock
多进程与多线程最大的不同在于,多进程中,同一个变量,各自有一个拷贝存在于每个进程中,互不影响;而多线程中,所有的变量由所有线程共享,任何一个变量都可以被任何一个线程修改,线程间共享数据最大的危险在于多个线程同时修改一个变量可能会把内容改乱了。
我们定义一个共享变量balance,初始值为0,并且启动两个,先存后取,理论值应该为0,但是线程由操作系统决定,当t1,t2交替执行时,循环次数足够多,balance
的结果就不一定是0了。
# Lock
balance = 0
def change_it(n):
global balance
balance = balance + n
balance = balance - n
def run_thread(n):
for i in range(100000):
change_it(n)
t1 = threading.Thread(target=run_thread,args=(5,))
t2 = threading.Thread(target=run_thread,args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)
原因在于高级语言的一条语句在CPU执行时时若干条语句,即使一个简单的计算balance = balance + n
也分两步
1.计算balance + n
,存入临时变量中
2.将临时变量的值赋给balance
由于x是局部变量,两个线程都有自己的临时变量,当代码正常执行时
t1: x1 = balance + 5 # x1 = 0 + 5 = 5
t1: balance = x1 # balance = 5
t1: x1 = balance - 5 # x1 = 5 - 5 = 0
t1: balance = x1 # balance = 0
t2: x2 = balance + 8 # x2 = 0 + 8 = 8
t2: balance = x2 # balance = 8
t2: x2 = balance - 8 # x2 = 8 - 8 = 0
t2: balance = x2 # balance = 0
但是t1和t2是交替运行的,操作系统可能以下面顺序执行t1,t2
t1: x1 = balance + 5 # x1 = 0 + 5 = 5
t2: x2 = balance + 8 # x2 = 0 + 8 = 8
t2: balance = x2 # balance = 8
t1: balance = x1 # balance = 5
t1: x1 = balance - 5 # x1 = 5 - 5 = 0
t1: balance = x1 # balance = 0
t2: x2 = balance - 8 # x2 = 0 - 8 = -8
t2: balance = x2 # balance = -8
究其原因,是因为修改balance
需要多条语句,而执行这几条语句时,线程可能中断,导致多个线程把同一个对象的内容改乱了。
如果我们要确保balance
计算正确,就要给change_it()
上一把锁,当某个线程开始执行change_it()
,该线程获得锁,其他线程不能执行change_it()
,等待,当锁被释放后,获得该锁以后才能改,锁只有一个,同一时刻只有一个线程持有该锁,所以不会造成修改的冲突,创建一个锁通过threading.Lock()
来实现
#Lock
balance = 0
lock = threading.Lock()
def run_thread(n):
for i in range(100000):
lock.acquire()
try:
change_it(n)
finally:
lock.release()#try...finally确保锁一定会被释放
锁的好处是确保了某段关键代码智能有一个线程从头到尾完整的执行;缺点也有很多,阻止了多线程并发执行,包含锁的某段代码只能以单线程模式执行,效率大大降低了,由于可以存在多个锁,不同的线程持有不同的锁,可能造成死锁,导致多个线程全部挂起,无法执行和结束,只能靠操作系统强制终止。
多核CPU
一个死循环线程会100%占用一个CPU,如果有两个死循环线程,在多核CPU中,可以占用200%的CPU,也就是两个CPU核心。要想将N核CPU的核心全部跑满,就必须启动N个死循环线程。
#N核CPU的死循环线程
def loop():
x = 0
while True:
x = x^1
for i in range(multiprocessing.cpu_count()):
t = threading.Thread(target=loop)
t.start()
会发现cpu仅使用了一核。虽然python的线程是真正的线程,但解释器执行代码时,有一个GIL锁:任何python线程执行前必须先获得GIL锁,然后,每执行100条字节码,解释器就会自动释放GIL锁,让别的线程有机会执行,这个GIL全局锁实际上把所有线程的执行代码都上了锁,因而多线程在python中只能交替执行。
未完待续…
文章参考自廖雪峰的官方网站