python-20-多进程多线程详解
一.说明
在python中的基础系列我们终于来到了多进程和多线程的知识点了,在学习这个知识点前,我想问问都是用来进行并发处理的,为啥要区分多进程 多线程?或者说什么是多进程 什么是多线程?
多进程:
1.多进程是通过创建多个独立的进程来并行执行任务
举个例子:你电脑是8核心的,那么可以创建超过8个进程,但是超过8个进程会导致上下文切换开销增加,从而导致降低性能!
说句人话,当进程数小于cpu核心数,每个进程都能分配到自己的核心进行运行,不必进行进程切换,从而没有切换上下文的开销!!
所以,多进程并不是越多越好!!
多线程:
- 线程 是程序中执行流的最小单元,一个进程可以包含多个线程;
- 多线程是通过在同一进程中创建多个线程来并发执行任务,如果任务是 I/O 密集型的(例如网络请求、文件操作),由于 GIL 释放给了 I/O 操作,其他线程可以在 GIL 被释放时执行,因此多线程仍然能够在 I/O 密集型任务中提高并发性能。
- 多个线程共享同一进程的内存空间和资源,线程之间的创建和切换比进程更加轻量级;
- 对于 CPU 密集型任务,Python 的 全局解释器锁(GIL) 会限制线程的并行执行,这就使得多线程在 CPU 密集型任务上无法提供预期的性能提升;
- Python 的 全局解释器锁(GIL):
在 Python 中,全局解释器锁(GIL) 会影响多线程的并发执行。GIL 只允许一个线程在任何时刻执行 Python 字节码,这意味着对于 CPU 密集型任务,Python 的多线程不能利用多核 CPU 的计算能力,只能在单个核心上执行。
- 线程池的大小不可随便确定
CPU 核心数:对于 CPU 密集型任务,线程池的大小通常不会超过 CPU 核心数,因为超出核心数的线程会增加上下文切换的开销,反而降低性能。
I/O 密集型任务:对于 I/O 密集型任务,线程池的大小可以更大,因为线程在等待 I/O 操作完成时不会占用 CPU 资源,操作系统会将 CPU 分配给其他线程。
内存和系统资源:每个线程都需要一定的内存和系统资源,过多的线程会导致内存消耗过大,甚至会导致系统资源耗尽。 - 线程可以跨多个 CPU 核心运行,操作系统会管理线程在核心之间的调度。
二.多进程
1. os.fork() 函数 (不推荐,不能跨平台,只能Linux、Unix )
它在当前进程的上下文中创建一个子进程。子进程是当前进程(称为父进程)的副本,但拥有自己的进程ID(PID)
fork()
的基本原理
- 返回值:
- 在父进程中,
fork()
返回新创建的子进程的进程 ID。 - 在子进程中,
fork()
返回 0。 - 如果创建子进程失败,
fork()
返回 -1,并抛出OSError。
- 复制进程:子进程会复制父进程的内存空间,但实际上是采用写时复制(copy-on-write)机制,只有在需要写入时才会真正复制内存,从而节省资源。
示例
import os
import time
def main():
pid = os.fork() # 创建子进程
if pid < 0:
# fork() 失败
print("Fork failed")
elif pid == 0:
# 子进程执行的代码
print(f"Child process: My PID is {os.getpid()}")
time.sleep(2) # 模拟子进程工作
print("Child process: Work done!")
else:
# 父进程执行的代码
print(f"Parent process: My PID is {os.getpid()} and my child's PID is {pid}")
os.wait() # 等待子进程结束
print("Parent process: Child has terminated.")
if __name__ == "__main__":
main()
2.multiprocessing.Process()函数
- multiprocessing模块提供Process类实现新建进程
import multiprocessing
import time
def worker(num):
print(f'Worker {num} starting')
time.sleep(2)
print(f'Worker {num} finished')
if __name__ == '__main__':
processes = []
for i in range(5):
p = multiprocessing.Process(target=worker, args=(i,))
processes.append(p)
p.start()
for p in processes:
p.join()
'''
Worker 0 starting
Worker 1 starting
Worker 2 starting
Worker 3 starting
Worker 4 starting
Worker 0 finished
Worker 1 finished
Worker 2 finished
Worker 3 finished
Worker 4 finished
'''
- pool = multiprocessing.Pool(processes=5) # 创建进程池实现多进程
pool.apply()
和pool.apply_async()
:
apply()
:类似于顺序调用函数,它会阻塞,直到任务执行完成。apply_async()
:异步执行任务,不会阻塞主程序。
pool.map()
和 pool.map_async()
:
map()
:将函数func
应用到iterable
中的每一个元素,返回结果列表,并且是同步执行的。map_async()
:map()
的异步版本,返回一个AsyncResult
对象,允许你在执行期间继续做其他工作。
pool.close()
和 pool.join()
:
close()
:\# 关闭进程池,表示不能在往进程池中添加进程join()
:\# 等待进程池中的所有进程执行完毕,必须在close()之后调用
import multiprocessing
import time
def worker(num):
print(f'Worker {num} starting')
time.sleep(2)
print(f'Worker {num} finished')
if __name__ == '__main__':
# 创建一个进程池,最多同时运行 5 个进程
with multiprocessing.Pool(processes=5) as pool:
# 使用 map 方法将任务分配给进程池,map 会阻塞,直到所有任务完成
pool.map(worker, range(5))
###########利用.get()方法来阻塞和等待结果
import multiprocessing
import time
def worker(num):
print(f'Worker {num} starting')
time.sleep(2)
print(f'Worker {num} finished')
return num * 2
if __name__ == '__main__':
# 使用进程池
with multiprocessing.Pool(processes=5) as pool:
# 使用 apply_async 异步执行任务
async_result = pool.apply_async(worker, (1,))
print(f'Result of apply_async: {async_result.get()}') # 阻塞等待结果
# 使用 map_async 异步执行任务
async_results = pool.map_async(worker, range(5))
print(f'Result of map_async: {async_results.get()}') # 阻塞等待所有结果
####################利用 pool.close()和pool.join()
import multiprocessing
import time
def worker(num):
print(f'Worker {num} starting')
time.sleep(2)
print(f'Worker {num} finished')
return num
if __name__ == '__main__':
# 创建进程池,最多同时运行 5 个进程
resList = []
with multiprocessing.Pool(processes=5) as pool:
# 使用 apply_async 异步提交任务
results = []
for i in range(5):
result = pool.apply_async(worker, (i,))
results.append(result) # 存储每个任务的 AsyncResult 对象
# 关闭进程池,不再接受新任务
pool.close()
# 等待所有任务完成并获取结果
for result in results:
resList.append(result.get()) # 阻塞,等待每个任务的完成
# 等待所有子进程结束
pool.join()
print(resList)
'''
Worker 0 starting
Worker 1 starting
Worker 2 starting
Worker 3 starting
Worker 4 starting
Worker 2 finished
Worker 0 finished
Worker 1 finished
Worker 3 finished
Worker 4 finished
[0, 1, 2, 3, 4]
'''
- 多进程获取进程返回结果
multiprocessing.Process 本身并不能直接返回值,我们需要Queue (队列)来实现收集结果
import multiprocessing
import time
def worker(num, queue):
print(f'Worker {num} starting')
time.sleep(2)
print(f'Worker {num} finished')
queue.put(num * 2) # 将计算结果放入队列中
if __name__ == '__main__':
processes = []
queue = multiprocessing.Queue() # 创建一个队列用于进程间通信
# 创建并启动 5 个进程
for i in range(5):
p = multiprocessing.Process(target=worker, args=(i, queue))
processes.append(p)
p.start()
# 等待所有进程完成
for p in processes:
p.join()
# 获取并打印每个进程的返回值
while not queue.empty(): # 确保队列中所有结果都已获取
result = queue.get()
print(f'Result from worker: {result}')
- 进程池获取返回结果
Pool.map()
:是并行计算并按顺序返回结果,适合任务较简单且结果需要按顺序返回的场景;Pool.apply_async()
:是异步调用任务,适合任务较复杂或需要更灵活控制的场景。需要使用get()
来获取任务结果。Pool.map_async()
:是map()
的异步版本,返回一个AsyncResult
对象。
#利用map
import multiprocessing
import time
def worker(num):
print(f'Worker {num} starting')
time.sleep(2)
print(f'Worker {num} finished')
return num * 2 # 返回任务的结果
if __name__ == '__main__':
# 创建一个进程池,最多同时运行 5 个进程
with multiprocessing.Pool(processes=5) as pool:
# 使用 map 并行地执行任务
results = pool.map(worker, range(5)) # 返回每个进程的返回值列表
# 打印所有结果
print("Results:", results)
######利用get方法
import multiprocessing
import time
def worker(num):
print(f'Worker {num} starting')
time.sleep(2)
print(f'Worker {num} finished')
return num * 2
if __name__ == '__main__':
# 创建进程池
with multiprocessing.Pool(processes=5) as pool:
# 使用 apply_async 异步提交任务
results = [pool.apply_async(worker, (i,)) for i in range(5)]
# 获取每个任务的返回值
output = [result.get() for result in results] # 使用 get() 等待任务完成并获取结果
print("Results:", output)
'''
Worker 0 starting
Worker 1 starting
Worker 2 starting
Worker 3 starting
Worker 4 starting
Worker 0 finished
Worker 2 finished
Worker 1 finished
Worker 3 finished
Worker 4 finished
Results: [0, 2, 4, 6, 8]
'''
################错误map_async用法
import multiprocessing
import time
def worker(num):
print(f'Worker {num} starting')
time.sleep(2)
print(f'Worker {num} finished')
return num * 2
if __name__ == '__main__':
# 创建进程池
with multiprocessing.Pool(processes=5) as pool:
# 使用 map_async 异步提交任务
results = [pool.map_async(worker, (i,)) for i in range(5)]
# 获取每个任务的返回值
output = [result.get() for result in results] # 使用 get() 等待任务完成并获取结果
print("Results:", output)
'''
Worker 0 starting
Worker 2 starting
Worker 1 starting
Worker 3 starting
Worker 4 starting
Worker 2 finished
Worker 0 finished
Worker 1 finished
Worker 3 finished
Worker 4 finished
Results: [[0], [2], [4], [6], [8]]
'''
#########正确map_async用法
import multiprocessing
import time
def worker(num):
print(f'Worker {num} starting')
time.sleep(2)
print(f'Worker {num} finished')
return num * 2
if __name__ == '__main__':
# 创建进程池
with multiprocessing.Pool(processes=5) as pool:
# 使用 map_async 异步提交任务
result = pool.map_async(worker, range(5))
# 获取每个任务的返回值
output = result.get() # 使用 get() 等待任务完成并获取结果
print("Results:", output)
'''
Worker 0 starting
Worker 1 starting
Worker 2 starting
Worker 3 starting
Worker 4 starting
Worker 0 finished
Worker 2 finished
Worker 1 finished
Worker 3 finished
Worker 4 finished
Results: [0, 2, 4, 6, 8]
'''
注意:
上面的例子是同样的写法,但结果却不一样:
apply_async获取的结果是[0, 2, 4, 6, 8]
map_async获取的结果是[[0], [2], [4], [6], [8]]
为什么2个代码基本结构一样,但是执行的结构却不一样?
因为apply_async
是异步产生5个独立的任务,每个任务都在独立的进程中运行,并且 apply_async()
返回的是 AsyncResult
对象;
map_async()
提供了一个 包含单个元素的元组 (i,)
,这意味着它会按顺序执行 5 次 map_async()
,每次提交 单个任务,这与 apply_async()
的行为不同,实际上是将任务映射到整个可迭代对象的所有元素,它会在内部处理多个任务,而返回的是一个包含所有结果的 AsyncResult
对象,因此输出和任务提交的方式不一样;
通过这个例子能更好的理解apply_async
和map_async()
的区别
- 进程间通讯方式
进程间通信(IPC, Inter-Process Communication) 是指多个进程之间交换数据和信息的机制。python中进程间通信方式,主要包括以下几种;
Queue(队列):允许多个进程之间以生产者-消费者模式进行数据交换。
Pipe(管道):通过管道的两端进行数据交换。
Manager(共享内存):提供进程间共享对象(如共享字典、列表等)。
Value 和 Array(共享内存):允许多个进程共享单一的变量或数组。
Lock(锁):确保多个进程对共享资源的同步访问
- Queue(队列)
import multiprocessing
import time
def worker(num, queue):
print(f'Worker {num} starting')
time.sleep(2)
print(f'Worker {num} finished')
queue.put(num * 2) # 将计算结果放入队列中
if __name__ == '__main__':
processes = []
queue = multiprocessing.Queue() # 创建一个队列用于进程间通信
# 创建并启动 5 个进程
for i in range(5):
p = multiprocessing.Process(target=worker, args=(i, queue))
processes.append(p)
p.start()
# 等待所有进程完成
for p in processes:
p.join()
# 获取并打印每个进程的返回值
while not queue.empty(): # 确保队列中所有结果都已获取
result = queue.get()
print(f'Result from worker: {result}')
- Pipe(管道)
Pipe
是multiprocessing
模块提供的另一种进程间通信方式,它通过管道的两端进行通信。管道有两个端口,一个用于发送数据(send
),一个用于接收数据(recv
)。
import multiprocessing
import time
def sender(conn):
for i in range(5):
print(f"Sender sending {i}")
conn.send(i) # 发送数据
time.sleep(1)
def receiver(conn):
while True:
data = conn.recv() # 接收数据
if data == "DONE":
break
print(f"Receiver received {data}")
if __name__ == "__main__":
# 创建管道,返回两个连接对象
parent_conn, child_conn = multiprocessing.Pipe()
# 创建进程
p1 = multiprocessing.Process(target=sender, args=(parent_conn,))
p2 = multiprocessing.Process(target=receiver, args=(child_conn,))
p1.start()
p2.start()
p1.join()
parent_conn.send("DONE") # 结束信号
p2.join()
'''
Sender sending 0
Receiver received 0
Sender sending 1
Receiver received 1
Sender sending 2
Receiver received 2
Sender sending 3
Receiver received 3
Sender sending 4
Receiver received 4
'''
- Manager(共享内存)
Manager
是multiprocessing
模块中提供的一个特殊对象,它允许不同进程共享数据。Manager
可以创建共享对象,比如共享的列表、字典等,这些对象会在进程间同步。
import multiprocessing
import time
def worker(shared_dict):
for i in range(5):
shared_dict[i] = f"Task {i}"
print(f"Worker added: {shared_dict[i]}")
time.sleep(1)
if __name__ == "__main__":
with multiprocessing.Manager() as manager:
shared_dict = manager.dict() # 创建一个共享字典
p = multiprocessing.Process(target=worker, args=(shared_dict,))
p.start()
p.join()
print(f"Shared dict: {shared_dict}")
'''
Worker added: Task 0
Worker added: Task 1
Worker added: Task 2
Worker added: Task 3
Worker added: Task 4
Shared dict: {0: 'Task 0', 1: 'Task 1', 2: 'Task 2', 3: 'Task 3', 4: 'Task 4'}
'''
- Value 和 Array(共享内存)
Value
和Array
是multiprocessing
模块中的两种共享内存对象,可以用于在多个进程之间共享数据。Value
用于存储单一数据类型的值,而Array
用于存储数组(类似于列表)。
import multiprocessing
import time
def worker(shared_value):
for i in range(5):
shared_value.value += 1 # 修改共享变量
print(f"Worker incremented: {shared_value.value}")
time.sleep(1)
if __name__ == "__main__":
# 创建一个共享变量
shared_value = multiprocessing.Value('i', 0) # 'i' 表示整型
p = multiprocessing.Process(target=worker, args=(shared_value,))
p.start()
p.join()
print(f"Final shared value: {shared_value.value}")
'''
Worker incremented: 1
Worker incremented: 2
Worker incremented: 3
Worker incremented: 4
Worker incremented: 5
Final shared value: 5
'''
- Lock(锁)
在多个进程共享数据时,往往需要确保数据的一致性和线程安全。Lock
是 Python 提供的一种同步原语,它用于确保同一时刻只有一个进程能够访问某个共享资源。
import multiprocessing
import time
def worker(lock, shared_value):
for _ in range(5):
with lock:# 使用锁来保证线程安全
shared_value.value += 1
print(f"Worker incremented: {shared_value.value}")
time.sleep(1)
if __name__ == "__main__":
shared_value = multiprocessing.Value('i', 0) # 创建一个共享变量
lock = multiprocessing.Lock() # 创建一个锁
processes = [multiprocessing.Process(target=worker, args=(lock, shared_value)) for _ in range(3)]
for p in processes:
p.start()
for p in processes:
p.join()
print(f"Final shared value: {shared_value.value}")
三.多线程
多线程是通过在同一进程中创建多个线程来并发执行任务。线程共享进程的内存空间,因此它们之间可以更容易地共享数据,但也容易引发线程安全问题。由于 GIL 的存在,Python 的多线程在 CPU 密集型任务上通常不能提高性能,但在 I/O 密集型任务上表现良好。
- 基本用法
Python 中可以使用 threading
模块来实现多线程。以下是一个简单的示例:
import threading
import time
def worker(num):
print(f'Worker {num} starting')
time.sleep(2)
print(f'Worker {num} finished')
threads = []
for i in range(5):
t = threading.Thread(target=worker, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()
- 线程池
线程池是一种设计模式,用于管理一定数量的线程来处理多个任务。线程池的目的是避免频繁地创建和销毁线程带来的开销,同时通过复用线程来提高程序的效率。
在 Python 中,concurrent.futures.ThreadPoolExecutor
提供了线程池的功能,它会管理一定数量的线程,并按需调度这些线程来执行任务。
import concurrent.futures
import time
def worker(num):
print(f"Worker {num} starting")
time.sleep(2)
print(f"Worker {num} finished")
return num * 2 # 返回结果
# 使用线程池执行任务
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
results = [executor.submit(worker, i) for i in range(5)]
# 获取每个任务的结果
output = [result.result() for result in results]
print("Results:", output)
'''
Worker 0 starting
Worker 1 starting
Worker 2 starting
Worker 2 finished
Worker 0 finished
Worker 3 starting
Worker 4 starting
Worker 1 finished
Worker 4 finished
Worker 3 finished
Results: [0, 2, 4, 6, 8]
'''
- 线程锁
import threading
import time
# 共享资源(全局变量)
shared_value = 0
# 创建一个锁对象
lock = threading.Lock()
# 线程工作函数
def increment():
global shared_value
# 获取锁,防止其他线程访问共享资源
lock.acquire() # 获取锁
try:
print(f"Thread {threading.current_thread().name} is incrementing shared_value")
current_value = shared_value
time.sleep(0.5) # 模拟一些工作
shared_value = current_value + 1
print(f"Thread {threading.current_thread().name} updated shared_value to {shared_value}")
finally:
# 无论是否发生异常,都要释放锁
lock.release() # 释放锁
if __name__ == "__main__":
# 创建多个线程
threads = [threading.Thread(target=increment, name=f"Thread-{i}") for i in range(5)]
# 启动线程
for t in threads:
t.start()
# 等待所有线程完成
for t in threads:
t.join()
print(f"Final shared_value: {shared_value}")
五.总结
Python 多线程、多进程初学编程的人可能觉得很复杂 其实静下来,理解他 就知道原来就这么回事,没啥!重点是要理解什么是进程什么是线程!!理解了根据真实项目 照着写就行!切记学编程不是死记硬背!如果你采用死记硬背的方式来学编程,那么你完蛋了!!!你永远背不完!!chatgpt也有错误。。何况人。。人与chatgpt的区别是理解,是思想。。
创作整理不易,请大家多多关注 多多点赞,有写的不对的地方欢迎大家补充,我来整理,再次感谢!