import random
from concurrent.futures import ThreadPoolExecutor
from queue import Queue
import threading
import time
# lock = threading.RLock() #调用threading模块中的RLock()
# lock.acquire() #开始给线程加锁
# # do somthing
# lock.release() #给线程解锁
# with threading.Lock():
# do something
# 定义一个准备作为线程任务的函数
def thread_task(element):
"""
线程任务
:param element:
:return:
"""
time.sleep(random.randint(0, 3))
print("{} is done!".format(element))
def thread_put_element(que: Queue, elements: list):
"""
线程函数,将元素放进队列
:param que:队列
:param elements: 列表(数据量大的情况下,迭代器可能会更好)
:return:None
"""
for i in elements:
que.put(i)
print("put element:", i)
def main(threads_nums=3, queue_max=5, time_wait=5, *args, **kwargs) -> None:
"""
主程序
:param threads_nums:线程池线程数
:param queue_max:元素队列最大长度
:param time_wait:如果队列满了,等待 time_wait 秒后,再分发任务.
time_wait 一般设置为一个队列元素相对应的 task 时间
避免一次性把所有任务都分发给线程池,让大部分任务都在内存中处于等待状态
:param args: else you want
:param kwargs: else you want
:return: None
"""
queue_element = Queue(maxsize=queue_max) # 创建一个限制长度的队列
elements = [i for i in range(10)] # 生成队列元素
t_put_queue = threading.Thread(target=thread_put_element, # 单独建一个线程用于将数据元素放入队列
args=(queue_element, elements),
daemon=True
) # daemon=True 设置守护线程,主线程结束,此线程也一起结束
t_put_queue.start()
ALL_ELEMENTS_NUM = len(elements) # 保存所有队列元素个数,用于退出主线程
task_num = 0 # 任务计数
# 创建一个包含 threads_nums 条线程的线程池
with ThreadPoolExecutor(max_workers=threads_nums) as pool:
while True:
if task_num >= ALL_ELEMENTS_NUM: # 如果所有 task 都分发完了,退出主线程循环
break
task_num += 1
if queue_element.full(): # 如果队列是满的,等待 time_wait 秒
print("queue is crowed,wait for {} secs...".format(time_wait))
time.sleep(time_wait)
element = queue_element.get() # 从队列获取元素,submit给线程池执行
print("[{}th] [submit-{}]:".format(task_num, element))
pool.submit(thread_task, element) # 任务submit
if __name__ == '__main__':
main()