写在前面
在多年的Python开发实践中,我发现很多开发者对并发编程存在理解偏差。本文将基于个人项目经验,深入探讨threading和multiprocessing这两个核心模块,通过真实案例展示它们的适用场景和陷阱。
一、线程与进程的本质区别
1.1 内存模型差异
import threading
import multiprocessing
# 共享变量测试
shared_data = 0
def thread_task():
global shared_data
shared_data += 1
def process_task(data):
data.value += 1
# 线程测试
threads = []
for _ in range(10):
t = threading.Thread(target=thread_task)
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"Thread result: {shared_data}") # 结果不确定,通常<10
# 进程测试
processes = []
shared_value = multiprocessing.Value('i', 0)
for _ in range(10):
p = multiprocessing.Process(target=process_task, args=(shared_value,))
processes.append(p)
p.start()
for p in processes:
p.join()
print(f"Process result: {shared_value.value}") # 总是0
关键发现:
- 线程共享内存空间,但需要处理竞争条件
- 进程内存隔离,必须使用特殊方式共享数据
二、threading模块实战技巧
2.1 锁的艺术
from threading import Lock
class ThreadSafeCounter:
def __init__(self):
self._value = 0
self._lock = Lock()
def increment(self):
with self._lock:
self._value += 1
@property
def value(self):
with self._lock:
return self._value
# 测试用例
counter = ThreadSafeCounter()
def worker():
for _ in range(1000):
counter.increment()
threads = [threading.Thread(target=worker) for _ in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()
print(f"Final counter: {counter.value}") # 正确输出10000
经验总结:
- 优先使用with语句管理锁
- 细粒度锁能提高并发性
- 避免嵌套锁导致的死锁
2.2 生产者消费者模式实现
import queue
import random
def producer(q, items):
for item in items:
q.put(item)
print(f"Produced {item}")
time.sleep(random.uniform(0.1, 0.5))
def consumer(q):
while True:
try:
item = q.get(timeout=1)
print(f"Consumed {item}")
q.task_done()
except queue.Empty:
break
# 配置队列和线程
work_queue = queue.Queue(5) # 限制队列大小
producers = [
threading.Thread(target=producer, args=(work_queue, range(10))),
threading.Thread(target=producer, args=(work_queue, 'ABCD'))
]
consumers = [threading.Thread(target=consumer, args=(work_queue,)) for _ in range(3)]
for p in producers:
p.start()
for c in consumers:
c.start()
work_queue.join() # 等待队列处理完成
三、multiprocessing高级用法
3.1 进程池最佳实践
from multiprocessing import Pool
import os
def cpu_bound_task(x):
print(f"Process {os.getpid()} handling {x}")
return x * x
if __name__ == '__main__':
with Pool(processes=4) as pool:
# map_async非阻塞版本
result = pool.map_async(cpu_bound_task, range(10))
print("Main process continuing...")
print(result.get()) # 获取结果时会阻塞
# 使用imap处理大数据流
with Pool(2) as pool:
for res in pool.imap(cpu_bound_task, range(100), chunksize=10):
print(f"Got result: {res}")
注意事项:
- 进程池创建开销大,适合长时间运行任务
- chunksize参数显著影响性能
- 必须保护入口点(if name == 'main')
3.2 共享内存优化
from multiprocessing import shared_memory
def process_worker(shm_name):
existing_shm = shared_memory.SharedMemory(name=shm_name)
numpy_array = np.ndarray((10,), dtype=np.int64, buffer=existing_shm.buf)
numpy_array[0] += 1 # 修改共享数据
existing_shm.close()
if __name__ == '__main__':
# 创建共享内存
arr = np.zeros(10, dtype=np.int64)
shm = shared_memory.SharedMemory(create=True, size=arr.nbytes)
shm_array = np.ndarray(arr.shape, dtype=arr.dtype, buffer=shm.buf)
np.copyto(shm_array, arr)
processes = []
for _ in range(5):
p = multiprocessing.Process(target=process_worker, args=(shm.name,))
processes.append(p)
p.start()
for p in processes:
p.join()
print("Final array:", shm_array) # 显示所有进程修改后的结果
shm.close()
shm.unlink() # 释放内存
四、性能对比测试
4.1 IO密集型任务
import requests
def fetch_url(url):
response = requests.get(url)
return len(response.text)
# 线程版
def thread_version(urls):
with ThreadPoolExecutor(max_workers=10) as executor:
list(executor.map(fetch_url, urls))
# 进程版
def process_version(urls):
with ProcessPoolExecutor(max_workers=10) as executor:
list(executor.map(fetch_url, urls))
# 测试结果(10个URL):
# 线程版:1.2s
# 进程版:1.5s
4.2 CPU密集型任务
def calculate_prime(n):
primes = []
for num in range(2, n+1):
if all(num % i != 0 for i in range(2, int(num**0.5)+1)):
primes.append(num)
return primes
# 测试结果(n=100000):
# 单线程:12.7s
# 线程版(4线程):13.1s (GIL限制)
# 进程版(4进程):3.8s
五、常见陷阱与解决方案
-
GIL误解:
- 仅在纯Python代码中生效
- C扩展(如numpy)可以释放GIL
-
僵尸进程:
def safe_process(): p = multiprocessing.Process(target=...) p.start() p.join() # 必须调用join或terminate
-
线程局部存储:
thread_local = threading.local() def show_thread_data(): if not hasattr(thread_local, 'value'): thread_local.value = threading.get_ident() print(thread_local.value)
六、现代替代方案
虽然threading和multiprocessing是标准库,但在某些场景下可以考虑:
- asyncio:适合高并发IO操作
- concurrent.futures:更高级的接口
- joblib:科学计算友好
结语
经过多个项目的实践验证,我总结出以下原则:
- IO密集型:优先考虑线程或asyncio
- CPU密集型:必须使用多进程
- 混合型:可以考虑进程池+线程的混合模式
记住:没有银弹,最佳方案取决于你的具体场景。建议在关键路径上总是进行性能测试。