0
点赞
收藏
分享

微信扫一扫

进程池Pool结合Manager中的Queue队列的使用

Jonescy 2022-02-07 阅读 90
python队列

进程池Pool:

        当需要创建的子进程数量不多时,可以直接利用multiprocessing中的Process动态生成多个进程,但如果是成百上千的进程目标,手动创建的工作量将非常巨大,此时可用到multiprocessing模块中的Pool类。

        初始化Pool时,可以指定一个最大进程数,当有新的请求提交到Pool中时,如果进程池还没有满,那么就会创建一个新的进程用来执行该请求;如果池中的进程数已经达到指定的最大值,那么该请求就会等待,知道池中有进程结束,才会用之前结束的进程来执行新的任务。

Pool常用方法:

        apply_async(func[,args[,kwds]]):使用非阻塞方式调用func(并行执行,阻塞方式必须等待上

                   一个进程退出才能执行下一个进程),args为传递给func的参数列表,kwds为传递给

                   func的关键字擦数列表。 

        close():关闭Pool,使其不再接收新的任务。

        terminate():不管任务是否完成,立即终止。

        join():主进程阻塞,等待子进程退出,必须在close()或terminate()之后使用。

一.multiprocessing模块中进程池Pool的使用:

        from multiprocessing import Pool

        pool = Pool(进程池中的进程数)

        pool.apply_async(func=任务函数,args(元组类参数,))

        pool.close()

        pool.join()

from multiprocessing import Pool
import os, time, random


def work1(msg):
    start_time = time.time()  # 开始时间
    print("循环任务%d由进程号%d进程执行" % (msg, os.getpid()))
    time.sleep(random.random())  # 随机生产0-1的浮点数
    end_time = time.time()  # 结束时间
    print(msg, "执行完毕,耗时%0.2f" % (end_time - start_time))


if __name__ == '__main__':
    pool = Pool(3)  # 定义一个进程池,最大进程数为3
    for i in range(1, 11):
        # 每次循环都会用空闲出来的子进程去调用目标任务(函数)
        pool.apply_async(func=work1, args=(i,))  
    print("------start------")
    pool.close()  # 关闭进程池,关闭后pool不再接收新的请求任务
    pool.join()  # 等待pool进程池中所有的子进程执行完成,必须放在pool.close()之后
    print("-----end------")


输出:
------start------
循环任务1由进程号18809进程执行
循环任务2由进程号18810进程执行
循环任务3由进程号18811进程执行
3 执行完毕,耗时0.42
循环任务4由进程号18811进程执行
1 执行完毕,耗时0.47
循环任务5由进程号18809进程执行
2 执行完毕,耗时0.54
循环任务6由进程号18810进程执行
4 执行完毕,耗时0.19
循环任务7由进程号18811进程执行
7 执行完毕,耗时0.06
循环任务8由进程号18811进程执行
8 执行完毕,耗时0.12
循环任务9由进程号18811进程执行
5 执行完毕,耗时0.82
循环任务10由进程号18809进程执行
6 执行完毕,耗时0.86
9 执行完毕,耗时0.85
10 执行完毕,耗时0.65
-----end------

总结:进程池中的3个进程,在循环执行任务时,初始时3个进程会同时启动并执行任务1,2,3;当进程18811率先执行完毕任务1时,会紧接着执行任务4。

_____________________________________________________________________________

二.进程池中的进程间通信问题:

进程池Pool结合Manager中Queue的使用:

进程池中的Queue:如果要使用Pool创建进程,就需要使用到multiprocessing.Manager()中的Queue(),而不是multiprocessing.Queue()。

from multiprocessing import Pool, Manager
import os, time, random


def reader(q):
    print("reader启动(%s),父进程为(%s)" % (os.getpid(), os.getppid()))
    print("reader进程%s从Queue中获取到的信息为:%s" % (os.getpid(), q.get()))


def writer(q):
    print("writer启动(%s),父进程为(%s)" % (os.getpid(), os.getppid()))
    for i in range(1, 11):
        q.put(i)


if __name__ == '__main__':
    print("(%s) start" % os.getpid())  # 打印主进程(父进程)
    pool = Pool(5)  # 创建进程池
    q = Manager().Queue()  # 使用Manager中的Queue
    pool.apply_async(func=writer, args=(q,))
    time.sleep(1)  # 先让writer任务完队列中生产数据,然后再用reader任务从队列中消费数据。
    while True:
        if q.qsize() > 0:
            pool.apply_async(func=reader, args=(q,))
        else:
            break
    pool.close()
    pool.join()

输出:
(23003) start
writer启动(23005),父进程为(23003)
reader启动(23005),父进程为(23003)
reader进程23005从Queue中获取到的信息为:1
reader启动(23005),父进程为(23003)
reader进程23005从Queue中获取到的信息为:2
reader启动(23005),父进程为(23003)
reader进程23005从Queue中获取到的信息为:3
reader启动(23005),父进程为(23003)
reader进程23005从Queue中获取到的信息为:4
reader启动(23005),父进程为(23003)
reader进程23005从Queue中获取到的信息为:5
reader启动(23005),父进程为(23003)
reader进程23005从Queue中获取到的信息为:6
reader启动(23006),父进程为(23003)
reader启动(23007),父进程为(23003)
reader启动(23009),父进程为(23003)
reader进程23006从Queue中获取到的信息为:7
reader启动(23008),父进程为(23003)
reader进程23007从Queue中获取到的信息为:8
reader进程23009从Queue中获取到的信息为:9
reader启动(23005),父进程为(23003)
reader进程23005从Queue中获取到的信息为:10
reader启动(23007),父进程为(23003)
reader启动(23009),父进程为(23003)
reader启动(23006),父进程为(23003)
reader启动(23005),父进程为(23003)

总结:进程池中的5个进程,根据Manager.Queue()队列中的数据,被动态分配给writer和reader任务,进程23005~23009共计5个进程,都轮流执行了reader任务。

举报

相关推荐

0 条评论