介绍
- 安装:
pip3 install threadpool
- 导入线程池:
from concurrent.futures import ThreadPoolExecutor
- 导入进程池:
from concurrent.futures import ProcessPoolExecutor
- 以下用线程池作为示例, 进程池用法和线程池大致相同
示例
异步回调
- 进程池容量为 3
- 异步处理 task()
- 异步回调 after()
from concurrent.futures import ThreadPoolExecutor
import time
import random
# 需要并发执行的函数
def task(num, x):
time.sleep(random.randint(1, 3))
return num, x
# 这个是进程池的回调函数, 参数需要用 x.result() 取值
def after(x):
print("%s, %s" % (x.result()[0], x.result()[1]))
if __name__ == '__main__':
# 容量为 3 的线程池
# p = ThreadPoolExecutor(max_workers=3) # 可以指定关键字参数
p = ThreadPoolExecutor(3)
# 异步回调
for i in range(10):
# 如果不需要回调, 则不需要执行 add_done_callback
# 这里是传多个参数的示例
p.submit(task, i, 1).add_done_callback(after)
# 等同于 t.join()
p.shutdown()
print("end")
同步回调
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time, random, os
def task(num, x):
time.sleep(random.randint(1, 3))
return num, x
# 这个函数和异步回调的不同是, 不需要使用 result()取值, 如果是多个返回值, 则参数为tuple
def after(x):
print("%s, %s" % (x[0], x[1]))
if __name__ == "__main__":
p = ProcessPoolExecutor(3) # 参数和机器 cpu 核数相同即可
for i in range(5):
# 每次开进程都需要等拿到结果 r, 并且处理
r = p.submit(task, i).result()
after(r)
# 相当于 join()
p.shutdown()
print("end")
map的用法
import time
import random
from concurrent.futures import ThreadPoolExecutor # 线程池
def task(num, x):
time.sleep(random.randint(1, 3))
print(num, x)
if __name__ == '__main__':
p = ThreadPoolExecutor(3)
# 用法: map(func, iterable, iterable)
p.map(task, range(1, 12), range(1, 5))
p.shutdown()
print("end")