0
点赞
收藏
分享

微信扫一扫

python-20-多进程多线程详解

python-20-多进程多线程详解

一.说明

在python中的基础系列我们终于来到了多进程和多线程的知识点了,在学习这个知识点前,我想问问都是用来进行并发处理的,为啥要区分多进程 多线程?或者说什么是多进程 什么是多线程?

多进程

1.多进程是通过创建多个独立的进程来并行执行任务

举个例子:你电脑是8核心的,那么可以创建超过8个进程,但是超过8个进程会导致上下文切换开销增加,从而导致降低性能!

说句人话,当进程数小于cpu核心数,每个进程都能分配到自己的核心进行运行,不必进行进程切换,从而没有切换上下文的开销!!

所以,多进程并不是越多越好!!

多线程

  1. 线程 是程序中执行流的最小单元,一个进程可以包含多个线程;
  2. 多线程是通过在同一进程中创建多个线程来并发执行任务,如果任务是 I/O 密集型的(例如网络请求、文件操作),由于 GIL 释放给了 I/O 操作,其他线程可以在 GIL 被释放时执行,因此多线程仍然能够在 I/O 密集型任务中提高并发性能。
  3. 多个线程共享同一进程的内存空间和资源,线程之间的创建和切换比进程更加轻量级;
  4. 对于 CPU 密集型任务,Python 的 全局解释器锁(GIL) 会限制线程的并行执行,这就使得多线程在 CPU 密集型任务上无法提供预期的性能提升;
  5. Python 的 全局解释器锁(GIL):

在 Python 中,全局解释器锁(GIL) 会影响多线程的并发执行。GIL 只允许一个线程在任何时刻执行 Python 字节码,这意味着对于 CPU 密集型任务,Python 的多线程不能利用多核 CPU 的计算能力,只能在单个核心上执行。

  1. 线程池的大小不可随便确定
    CPU 核心数:对于 CPU 密集型任务,线程池的大小通常不会超过 CPU 核心数,因为超出核心数的线程会增加上下文切换的开销,反而降低性能。
    I/O 密集型任务:对于 I/O 密集型任务,线程池的大小可以更大,因为线程在等待 I/O 操作完成时不会占用 CPU 资源,操作系统会将 CPU 分配给其他线程。
    内存和系统资源:每个线程都需要一定的内存和系统资源,过多的线程会导致内存消耗过大,甚至会导致系统资源耗尽。
  2. 线程可以跨多个 CPU 核心运行,操作系统会管理线程在核心之间的调度。

二.多进程

1. os.fork() 函数 (不推荐,不能跨平台,只能Linux、Unix )

它在当前进程的上下文中创建一个子进程。子进程是当前进程(称为父进程)的副本,但拥有自己的进程ID(PID)

fork() 的基本原理
  • 返回值
  • 在父进程中,fork() 返回新创建的子进程的进程 ID。
  • 在子进程中,fork() 返回 0。
  • 如果创建子进程失败,fork() 返回 -1,并抛出OSError。
  • 复制进程:子进程会复制父进程的内存空间,但实际上是采用写时复制(copy-on-write)机制,只有在需要写入时才会真正复制内存,从而节省资源。
示例

import os
import time

def main():
    pid = os.fork()  # 创建子进程

    if pid < 0:
        # fork() 失败
        print("Fork failed")
    elif pid == 0:
        # 子进程执行的代码
        print(f"Child process: My PID is {os.getpid()}")
        time.sleep(2)  # 模拟子进程工作
        print("Child process: Work done!")
    else:
        # 父进程执行的代码
        print(f"Parent process: My PID is {os.getpid()} and my child's PID is {pid}")
        os.wait()  # 等待子进程结束
        print("Parent process: Child has terminated.")

if __name__ == "__main__":
    main()

2.multiprocessing.Process()函数
  1. multiprocessing模块提供Process类实现新建进程

import multiprocessing
import time

def worker(num):
    print(f'Worker {num} starting')
    time.sleep(2)
    print(f'Worker {num} finished')

if __name__ == '__main__':
    processes = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()
        
'''
Worker 0 starting
Worker 1 starting
Worker 2 starting
Worker 3 starting
Worker 4 starting
Worker 0 finished
Worker 1 finished
Worker 2 finished
Worker 3 finished
Worker 4 finished
'''

  1. pool = multiprocessing.Pool(processes=5) # 创建进程池实现多进程
    pool.apply()pool.apply_async()
  • apply():类似于顺序调用函数,它会阻塞,直到任务执行完成。
  • apply_async():异步执行任务,不会阻塞主程序。

pool.map()pool.map_async()

  • map():将函数 func 应用到 iterable 中的每一个元素,返回结果列表,并且是同步执行的。
  • map_async()map() 的异步版本,返回一个 AsyncResult 对象,允许你在执行期间继续做其他工作。

pool.close()pool.join()

  • close():\# 关闭进程池,表示不能在往进程池中添加进程
  • join():\# 等待进程池中的所有进程执行完毕,必须在close()之后调用

import multiprocessing
import time

def worker(num):
    print(f'Worker {num} starting')
    time.sleep(2)
    print(f'Worker {num} finished')

if __name__ == '__main__':
    # 创建一个进程池,最多同时运行 5 个进程
    with multiprocessing.Pool(processes=5) as pool:
        # 使用 map 方法将任务分配给进程池,map 会阻塞,直到所有任务完成
        pool.map(worker, range(5))
        
        
        
###########利用.get()方法来阻塞和等待结果
import multiprocessing
import time

def worker(num):
    print(f'Worker {num} starting')
    time.sleep(2)
    print(f'Worker {num} finished')
    return num * 2

if __name__ == '__main__':
    # 使用进程池
    with multiprocessing.Pool(processes=5) as pool:
        # 使用 apply_async 异步执行任务
        async_result = pool.apply_async(worker, (1,))
        print(f'Result of apply_async: {async_result.get()}')  # 阻塞等待结果

        # 使用 map_async 异步执行任务
        async_results = pool.map_async(worker, range(5))
        print(f'Result of map_async: {async_results.get()}')  # 阻塞等待所有结果
        
        
####################利用 pool.close()和pool.join()
import multiprocessing
import time

def worker(num):
    print(f'Worker {num} starting')
    time.sleep(2)
    print(f'Worker {num} finished')
    return num

if __name__ == '__main__':
    # 创建进程池,最多同时运行 5 个进程
    resList = []
    with multiprocessing.Pool(processes=5) as pool:
        # 使用 apply_async 异步提交任务
        results = []
        for i in range(5):
            result = pool.apply_async(worker, (i,))
            results.append(result)  # 存储每个任务的 AsyncResult 对象
        
        # 关闭进程池,不再接受新任务
        pool.close()

        # 等待所有任务完成并获取结果
        for result in results:
            resList.append(result.get())  # 阻塞,等待每个任务的完成

        # 等待所有子进程结束
        pool.join()
    print(resList)
    
    '''
    Worker 0 starting
    Worker 1 starting
    Worker 2 starting
    Worker 3 starting
    Worker 4 starting
    Worker 2 finished
    Worker 0 finished
    Worker 1 finished
    Worker 3 finished
    Worker 4 finished
    [0, 1, 2, 3, 4]
    '''

  1. 多进程获取进程返回结果
    multiprocessing.Process 本身并不能直接返回值,我们需要Queue (队列)来实现收集结果

import multiprocessing
import time

def worker(num, queue):
    print(f'Worker {num} starting')
    time.sleep(2)
    print(f'Worker {num} finished')
    queue.put(num * 2)  # 将计算结果放入队列中

if __name__ == '__main__':
    processes = []
    queue = multiprocessing.Queue()  # 创建一个队列用于进程间通信
    
    # 创建并启动 5 个进程
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i, queue))
        processes.append(p)
        p.start()

    # 等待所有进程完成
    for p in processes:
        p.join()

    # 获取并打印每个进程的返回值
    while not queue.empty():  # 确保队列中所有结果都已获取
        result = queue.get()
        print(f'Result from worker: {result}')

  1. 进程池获取返回结果
    Pool.map():是并行计算并按顺序返回结果,适合任务较简单且结果需要按顺序返回的场景;
    Pool.apply_async():是异步调用任务,适合任务较复杂或需要更灵活控制的场景。需要使用 get() 来获取任务结果。
    Pool.map_async():是 map() 的异步版本,返回一个 AsyncResult 对象。

#利用map
import multiprocessing
import time

def worker(num):
    print(f'Worker {num} starting')
    time.sleep(2)
    print(f'Worker {num} finished')
    return num * 2  # 返回任务的结果

if __name__ == '__main__':
    # 创建一个进程池,最多同时运行 5 个进程
    with multiprocessing.Pool(processes=5) as pool:
        # 使用 map 并行地执行任务
        results = pool.map(worker, range(5))  # 返回每个进程的返回值列表
    
    # 打印所有结果
    print("Results:", results)
    
    
    
######利用get方法
import multiprocessing
import time

def worker(num):
    print(f'Worker {num} starting')
    time.sleep(2)
    print(f'Worker {num} finished')
    return num * 2

if __name__ == '__main__':
    # 创建进程池
    with multiprocessing.Pool(processes=5) as pool:
        # 使用 apply_async 异步提交任务
        results = [pool.apply_async(worker, (i,)) for i in range(5)]

        # 获取每个任务的返回值
        output = [result.get() for result in results]  # 使用 get() 等待任务完成并获取结果

    print("Results:", output)
    '''
    Worker 0 starting
    Worker 1 starting
    Worker 2 starting
    Worker 3 starting
    Worker 4 starting
    Worker 0 finished
    Worker 2 finished
    Worker 1 finished
    Worker 3 finished
    Worker 4 finished
    Results: [0, 2, 4, 6, 8]
    '''
    
################错误map_async用法
import multiprocessing
import time

def worker(num):
    print(f'Worker {num} starting')
    time.sleep(2)
    print(f'Worker {num} finished')
    return num * 2

if __name__ == '__main__':
    # 创建进程池
    with multiprocessing.Pool(processes=5) as pool:
        # 使用 map_async 异步提交任务
        results = [pool.map_async(worker, (i,)) for i in range(5)]

        # 获取每个任务的返回值
        output = [result.get() for result in results]  # 使用 get() 等待任务完成并获取结果

    print("Results:", output)
    
    '''
    Worker 0 starting
    Worker 2 starting
    Worker 1 starting
    Worker 3 starting
    Worker 4 starting
    Worker 2 finished
    Worker 0 finished
    Worker 1 finished
    Worker 3 finished
    Worker 4 finished
    Results: [[0], [2], [4], [6], [8]]
    '''

#########正确map_async用法
import multiprocessing
import time

def worker(num):
    print(f'Worker {num} starting')
    time.sleep(2)
    print(f'Worker {num} finished')
    return num * 2

if __name__ == '__main__':
    # 创建进程池
    with multiprocessing.Pool(processes=5) as pool:
        # 使用 map_async 异步提交任务
        result = pool.map_async(worker, range(5)) 
        # 获取每个任务的返回值
        output = result.get()   # 使用 get() 等待任务完成并获取结果

    print("Results:", output)
    '''
        Worker 0 starting
        Worker 1 starting
        Worker 2 starting
        Worker 3 starting
        Worker 4 starting
        Worker 0 finished
        Worker 2 finished
        Worker 1 finished
        Worker 3 finished
        Worker 4 finished
        Results: [0, 2, 4, 6, 8]
    '''

注意:

上面的例子是同样的写法,但结果却不一样:

apply_async获取的结果是[0, 2, 4, 6, 8]

map_async获取的结果是[[0], [2], [4], [6], [8]]

为什么2个代码基本结构一样,但是执行的结构却不一样?

因为apply_async是异步产生5个独立的任务,每个任务都在独立的进程中运行,并且 apply_async() 返回的是 AsyncResult 对象;

map_async() 提供了一个 包含单个元素的元组 (i,),这意味着它会按顺序执行 5 次 map_async(),每次提交 单个任务,这与 apply_async() 的行为不同,实际上是将任务映射到整个可迭代对象的所有元素,它会在内部处理多个任务,而返回的是一个包含所有结果的 AsyncResult 对象,因此输出和任务提交的方式不一样;

通过这个例子能更好的理解apply_asyncmap_async()的区别

  1. 进程间通讯方式
    进程间通信(IPC, Inter-Process Communication) 是指多个进程之间交换数据和信息的机制。python中进程间通信方式,主要包括以下几种;
    Queue(队列):允许多个进程之间以生产者-消费者模式进行数据交换。
    Pipe(管道):通过管道的两端进行数据交换。
    Manager(共享内存):提供进程间共享对象(如共享字典、列表等)。
    Value 和 Array(共享内存):允许多个进程共享单一的变量或数组。
    Lock(锁):确保多个进程对共享资源的同步访问
  1. Queue(队列)

import multiprocessing
import time

def worker(num, queue):
    print(f'Worker {num} starting')
    time.sleep(2)
    print(f'Worker {num} finished')
    queue.put(num * 2)  # 将计算结果放入队列中

if __name__ == '__main__':
    processes = []
    queue = multiprocessing.Queue()  # 创建一个队列用于进程间通信
    
    # 创建并启动 5 个进程
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i, queue))
        processes.append(p)
        p.start()

    # 等待所有进程完成
    for p in processes:
        p.join()

    # 获取并打印每个进程的返回值
    while not queue.empty():  # 确保队列中所有结果都已获取
        result = queue.get()
        print(f'Result from worker: {result}')

  1. Pipe(管道)
    Pipemultiprocessing 模块提供的另一种进程间通信方式,它通过管道的两端进行通信。管道有两个端口,一个用于发送数据(send),一个用于接收数据(recv)。

import multiprocessing
import time

def sender(conn):
    for i in range(5):
        print(f"Sender sending {i}")
        conn.send(i)  # 发送数据
        time.sleep(1)

def receiver(conn):
    while True:
        data = conn.recv()  # 接收数据
        if data == "DONE":
            break
        print(f"Receiver received {data}")

if __name__ == "__main__":
    # 创建管道,返回两个连接对象
    parent_conn, child_conn = multiprocessing.Pipe()

    # 创建进程
    p1 = multiprocessing.Process(target=sender, args=(parent_conn,))
    p2 = multiprocessing.Process(target=receiver, args=(child_conn,))

    p1.start()
    p2.start()

    p1.join()
    parent_conn.send("DONE")  # 结束信号
    p2.join()
'''
Sender sending 0
Receiver received 0
Sender sending 1
Receiver received 1
Sender sending 2
Receiver received 2
Sender sending 3
Receiver received 3
Sender sending 4
Receiver received 4
'''

  1. Manager(共享内存)
    Managermultiprocessing 模块中提供的一个特殊对象,它允许不同进程共享数据。Manager 可以创建共享对象,比如共享的列表、字典等,这些对象会在进程间同步。

import multiprocessing
import time

def worker(shared_dict):
    for i in range(5):
        shared_dict[i] = f"Task {i}"
        print(f"Worker added: {shared_dict[i]}")
        time.sleep(1)

if __name__ == "__main__":
    with multiprocessing.Manager() as manager:
        shared_dict = manager.dict()  # 创建一个共享字典

        p = multiprocessing.Process(target=worker, args=(shared_dict,))
        p.start()
        p.join()

        print(f"Shared dict: {shared_dict}")
'''
Worker added: Task 0
Worker added: Task 1
Worker added: Task 2
Worker added: Task 3
Worker added: Task 4
Shared dict: {0: 'Task 0', 1: 'Task 1', 2: 'Task 2', 3: 'Task 3', 4: 'Task 4'}
'''

  1. Value 和 Array(共享内存)
    ValueArraymultiprocessing 模块中的两种共享内存对象,可以用于在多个进程之间共享数据。Value 用于存储单一数据类型的值,而 Array 用于存储数组(类似于列表)。

import multiprocessing
import time

def worker(shared_value):
    for i in range(5):
        shared_value.value += 1  # 修改共享变量
        print(f"Worker incremented: {shared_value.value}")
        time.sleep(1)

if __name__ == "__main__":
    # 创建一个共享变量
    shared_value = multiprocessing.Value('i', 0)  # 'i' 表示整型

    p = multiprocessing.Process(target=worker, args=(shared_value,))
    p.start()
    p.join()

    print(f"Final shared value: {shared_value.value}")
    '''
    Worker incremented: 1
    Worker incremented: 2
    Worker incremented: 3
    Worker incremented: 4
    Worker incremented: 5
    Final shared value: 5
    '''

  1. Lock(锁)
    在多个进程共享数据时,往往需要确保数据的一致性和线程安全。Lock 是 Python 提供的一种同步原语,它用于确保同一时刻只有一个进程能够访问某个共享资源。

import multiprocessing
import time

def worker(lock, shared_value):
    for _ in range(5):
        with lock:# 使用锁来保证线程安全
            shared_value.value += 1
            print(f"Worker incremented: {shared_value.value}")
        time.sleep(1)

if __name__ == "__main__":
    shared_value = multiprocessing.Value('i', 0)  # 创建一个共享变量
    lock = multiprocessing.Lock()  # 创建一个锁

    processes = [multiprocessing.Process(target=worker, args=(lock, shared_value)) for _ in range(3)]
    
    for p in processes:
        p.start()
    for p in processes:
        p.join()

    print(f"Final shared value: {shared_value.value}")

三.多线程

多线程是通过在同一进程中创建多个线程来并发执行任务。线程共享进程的内存空间,因此它们之间可以更容易地共享数据,但也容易引发线程安全问题。由于 GIL 的存在,Python 的多线程在 CPU 密集型任务上通常不能提高性能,但在 I/O 密集型任务上表现良好。

  1. 基本用法

Python 中可以使用 threading 模块来实现多线程。以下是一个简单的示例:

import threading
import time

def worker(num):
    print(f'Worker {num} starting')
    time.sleep(2)
    print(f'Worker {num} finished')

threads = []
for i in range(5):
    t = threading.Thread(target=worker, args=(i,))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

  1. 线程池
    线程池是一种设计模式,用于管理一定数量的线程来处理多个任务。线程池的目的是避免频繁地创建和销毁线程带来的开销,同时通过复用线程来提高程序的效率。
    在 Python 中,concurrent.futures.ThreadPoolExecutor 提供了线程池的功能,它会管理一定数量的线程,并按需调度这些线程来执行任务。

import concurrent.futures
import time

def worker(num):
    print(f"Worker {num} starting")
    time.sleep(2)
    print(f"Worker {num} finished")
    return num * 2  # 返回结果

# 使用线程池执行任务
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    results = [executor.submit(worker, i) for i in range(5)]

    # 获取每个任务的结果
    output = [result.result() for result in results]
    print("Results:", output)
    
    '''
    Worker 0 starting
    Worker 1 starting
    Worker 2 starting
    Worker 2 finished
    Worker 0 finished
    Worker 3 starting
    Worker 4 starting
    Worker 1 finished
    Worker 4 finished
    Worker 3 finished
    Results: [0, 2, 4, 6, 8]
    '''

  1. 线程锁

import threading
import time

# 共享资源(全局变量)
shared_value = 0

# 创建一个锁对象
lock = threading.Lock()

# 线程工作函数
def increment():
    global shared_value
    # 获取锁,防止其他线程访问共享资源
    lock.acquire()  # 获取锁
    try:
        print(f"Thread {threading.current_thread().name} is incrementing shared_value")
        current_value = shared_value
        time.sleep(0.5)  # 模拟一些工作
        shared_value = current_value + 1
        print(f"Thread {threading.current_thread().name} updated shared_value to {shared_value}")
    finally:
        # 无论是否发生异常,都要释放锁
        lock.release()  # 释放锁

if __name__ == "__main__":
    # 创建多个线程
    threads = [threading.Thread(target=increment, name=f"Thread-{i}") for i in range(5)]

    # 启动线程
    for t in threads:
        t.start()

    # 等待所有线程完成
    for t in threads:
        t.join()

    print(f"Final shared_value: {shared_value}")

五.总结

Python 多线程、多进程初学编程的人可能觉得很复杂 其实静下来,理解他 就知道原来就这么回事,没啥!重点是要理解什么是进程什么是线程!!理解了根据真实项目 照着写就行!切记学编程不是死记硬背!如果你采用死记硬背的方式来学编程,那么你完蛋了!!!你永远背不完!!chatgpt也有错误。。何况人。。人与chatgpt的区别是理解,是思想。。

创作整理不易,请大家多多关注 多多点赞,有写的不对的地方欢迎大家补充,我来整理,再次感谢!

举报

相关推荐

0 条评论