0
点赞
收藏
分享

微信扫一扫

多进程(python)


文章目录

  • ​​添加进程Process​​
  • ​​完整的线程和进程创建使用对比代码​​
  • ​​运用​​
  • ​​存储进程输出 Queue​​
  • ​​把结果放在 Queue 里​​
  • ​​主函数​​
  • ​​进程池Pool​​
  • ​​进程池 Pool() 和 map()​​
  • ​​自定义核数量​​
  • ​​apply_async()​​
  • ​​用 apply_async() 输出多个结果​​
  • ​​共享内存 shared memory​​
  • ​​Shared Value​​
  • ​​Shared Array​​
  • ​​参考数据形式​​
  • ​​进程锁 Lock​​
  • ​​不加进程锁​​
  • ​​加进程锁​​
  • ​​参考​​

添加进程Process

完整的线程和进程创建使用对比代码

import multiprocessing as mp
import threading as td

def job(a,d):
print('aaaaa')

t1 = td.Thread(target=job,args=(1,2))
p1 = mp.Process(target=job,args=(1,2))
t1.start()
p1.start()
t1.join()
p1.join()

从上面的使用对比代码可以看出,线程和进程的使用方法相似

运用

在运用时需要添加上一个定义main函数的语句

if __name__=='__main__':

完整的应用代码:

import multiprocessing as mp

def job(a,d):
print('aaaaa')

if __name__=='__main__':
p1 = mp.Process(target=job,args=(1,2))
p1.start()
p1.join()

运行环境要在terminal环境下,可能其他的编辑工具会出现运行结束后没有打印结果,在terminal中的运行后打印的结果为:

aaaaa

存储进程输出 Queue

Queue的功能是将每个核或线程的运算结果放在队里中, 等到每个线程或核运行完毕后再从队列中取出结果, 继续加载运算。原因很简单, 多线程调用的函数不能有返回值, 所以使用Queue存储多个线程运算的结果

把结果放在 Queue 里

定义一个被多线程调用的函数,q 就像一个队列,用来保存每次函数运行的结果

#该函数没有返回值!!!
def job(q):
res=0
for i in range(1000):
res+=i+i**2+i**3
q.put(res) #queue

主函数

定义一个多线程队列,用来存储结果

if __name__=='__main__':
q = mp.Queue()

定义两个线程函数,用来处理同一个任务, args 的参数只要一个值的时候,参数后面需要加一个逗号,表示args是可迭代的,后面可能还有别的参数,不加逗号会出错

p1 = mp.Process(target=job,args=(q,))
p2 = mp.Process(target=job,args=(q,))

分别启动、连接两个线程

p1.start()
p2.start()
p1.join()
p2.join()

上面是分两批处理的,所以这里分两批输出,将结果分别保存

res1 = q.get()
res2 = q.get()

打印最后的运算结果

print(res1+res2)
import multiprocessing as mp

def job(q):
res=0
for i in range(1000):
res+=i+i**2+i**3
q.put(res) #queue

if __name__=='__main__':
q = mp.Queue()
p1 = mp.Process(target=job,args=(q,))
p2 = mp.Process(target=job,args=(q,))
p1.start()
p2.start()
p1.join()
p2.join()
res1 = q.get()
res2 = q.get()
print(res1+res2)

运行的时候还是要在terminal中,最后运行结果为

499667166000

进程池Pool

进程池就是我们将所要运行的东西,放到池子里,Python会自行解决多进程的问题

首先import multiprocessing和定义job()

import multiprocessing as mp

def job(x):
return x*x

进程池 Pool() 和 map()

然后我们定义一个Pool

pool = mp.Pool()

有了池子之后,就可以让池子对应某一个函数,我们向池子里丢数据,池子就会返回函数返回的值。 Pool和之前的Process的不同点是丢向Pool的函数有返回值,而Process的没有返回值。

接下来用map()获取结果,在map()中需要放入函数和需要迭代运算的值,然后它会自动分配给CPU核,返回结果

res = pool.map(job, range(10))

让我们来运行一下

def multicore():
pool = mp.Pool()
res = pool.map(job, range(10))
print(res)

if __name__ == '__main__':
multicore()

运行结果:

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

自定义核数量

Pool默认大小是CPU的核数,我们也可以通过在Pool中传入processes参数即可自定义需要的核数量

def multicore():
pool = mp.Pool(processes=3) # 定义CPU核数量为3
res = pool.map(job, range(10))
print(res)

apply_async()

Pool除了map()外,还有可以返回结果的方式,那就是apply_async().

apply_async()中只能传递一个值,它只会放入一个核进行运算,但是传入值时要注意是可迭代的,所以在传入值后需要加逗号, 同时需要用get()方法获取返回值

def multicore():
pool = mp.Pool()
res = pool.map(job, range(10))
print(res)
res = pool.apply_async(job, (2,))
# 用get获得结果
print(res.get())

运行结果

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]  # map()
4 # apply_async()

用 apply_async() 输出多个结果

那么如何用apply_async()输出多个迭代呢?

我们在apply_async()中多传入几个值试试

res = pool.apply_async(job, (2,3,4,))

结果会报错:

TypeError: job() takes exactly 1 argument (3 given)

即apply_async()只能输入一组参数。

在此我们将apply_async() 放入迭代器中,定义一个新的multi_res

multi_res = [pool.apply_async(job, (i,)) for i in range(10)]

同样在取出值时需要一个一个取出来

print([res.get() for res in multi_res])

合并代码

def multicore():
pool = mp.Pool()
res = pool.map(job, range(10))
print(res)
res = pool.apply_async(job, (2,))
# 用get获得结果
print(res.get())
# 迭代器,i=0时apply一次,i=1时apply一次等等
multi_res = [pool.apply_async(job, (i,)) for i in range(10)]
# 从迭代器中取出
print([res.get() for res in multi_res])

运行结果

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81] # map()
4
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81] # multi_res

可以看出在apply用迭代器的得到的结果和用map得到的结果是一样的

共享内存 shared memory

只有用共享内存才能让CPU之间有交流。

Shared Value

我们可以通过使用Value数据存储在一个共享的内存表中。

import multiprocessing as mp

value1 = mp.Value('i', 0)
value2 = mp.Value('d', 3.14)

其中d和i参数用来设置数据类型的,d表示一个双精浮点类型,i表示一个带符号的整型。更多的形式请查看本页最后的表.

Shared Array

在Python的mutiprocessing中,有还有一个Array类,可以和共享内存交互,来实现在进程之间共享数据。

array = mp.Array('i', [1, 2, 3, 4])

这里的Array和numpy中的不同,它只能是一维的,不能是多维的。同样和Value 一样,需要定义数据形式,否则会报错。 我们会在后一节举例说明这两种的使用方法.

错误形式

array = mp.Array('i', [[1, 2], [3, 4]]) # 2维list

"""
TypeError: an integer is required
"""

参考数据形式

| Type code | C Type | Python Type | Minimum size in bytes |
| --------- | ------------------ | ----------------- | --------------------- |
| ​​​'b'​​​ | signed char | int | 1 |
| ​​​'B'​​​ | unsigned char | int | 1 |
| ​​​'u'​​​ | Py_UNICODE | Unicode character | 2 |
| ​​​'h'​​​ | signed short | int | 2 |
| ​​​'H'​​​ | unsigned short | int | 2 |
| ​​​'i'​​​ | signed int | int | 2 |
| ​​​'I'​​​ | unsigned int | int | 2 |
| ​​​'l'​​​ | signed long | int | 4 |
| ​​​'L'​​​ | unsigned long | int | 4 |
| ​​​'q'​​​ | signed long long | int | 8 |
| ​​​'Q'​​​ | unsigned long long | int | 8 |
| ​​​'f'​​​ | float | float | 4 |
| ​​​'d'​​ | double | float | 8 |

进程锁 Lock

不加进程锁

让我们看看没有加进程锁时会产生什么样的结果。

import multiprocessing as mp
import time

def job(v, num):
for _ in range(5):
time.sleep(0.1) # 暂停0.1秒,让输出效果更明显
v.value += num # v.value获取共享变量值
print(v.value, end="")

def multicore():
v = mp.Value('i', 0) # 定义共享变量
p1 = mp.Process(target=job, args=(v,1))
p2 = mp.Process(target=job, args=(v,3)) # 设定不同的number看如何抢夺内存
p1.start()
p2.start()
p1.join()
p2.join()

if __name__ == '__main__':
multicore()

在上面的代码中,我们定义了一个共享变量v,两个进程都可以对它进行操作。 在job()中我们想让v每隔0.1秒输出一次累加num的结果,但是在两个进程p1和p2 中设定了不同的累加值。所以接下来让我们来看下这两个进程是否会出现冲突。

运行一下:

1
4
5
8
9
12
13
16
17
20

我们可以看到,进程1和进程2在相互抢着使用共享内存v

加进程锁

为了解决上述不同进程抢共享资源的问题,我们可以用加进程锁来解决。

首先需要定义一个进程锁

l = mp.Lock() # 定义一个进程锁

然后将进程锁的信息传入各个进程中

p1 = mp.Process(target=job, args=(v,1,l)) # 需要将Lock传入
p2 = mp.Process(target=job, args=(v,3,l))

在job()中设置进程锁的使用,保证运行时一个进程的对锁内内容的独占

def job(v, num, l):
l.acquire() # 锁住
for _ in range(5):
time.sleep(0.1)
v.value += num # v.value获取共享内存
print(v.value)
l.release() # 释放

完整代码:

def job(v, num, l):
l.acquire() # 锁住
for _ in range(5):
time.sleep(0.1)
v.value += num # 获取共享内存
print(v.value)
l.release() # 释放

def multicore():
l = mp.Lock() # 定义一个进程锁
v = mp.Value('i', 0) # 定义共享内存
p1 = mp.Process(target=job, args=(v,1,l)) # 需要将lock传入
p2 = mp.Process(target=job, args=(v,3,l))
p1.start()
p2.start()
p1.join()
p2.join()

if __name__ == '__main__':
multicore()

运行一下,让我们看看是否还会出现抢占资源的情况:

1
2
3
4
5
8
11
14
17
20

显然,进程锁保证了进程p1的完整运行,然后才进行了进程p2的运行

参考

​​https://mofanpy.com/tutorials/python-basic/multiprocessing/​​


举报

相关推荐

0 条评论