0
点赞
收藏
分享

微信扫一扫

python 线程池与队列简单应用


import random
from concurrent.futures import ThreadPoolExecutor
from queue import Queue
import threading
import time


# lock = threading.RLock()    #调用threading模块中的RLock()
# lock.acquire()      #开始给线程加锁
# # do somthing
# lock.release()      #给线程解锁

# with threading.Lock():
#     do something


# 定义一个准备作为线程任务的函数
def thread_task(element):
    """
    线程任务
    :param element:
    :return:
    """
    time.sleep(random.randint(0, 3))
    print("{} is done!".format(element))


def thread_put_element(que: Queue, elements: list):
    """
    线程函数,将元素放进队列
    :param que:队列
    :param elements: 列表(数据量大的情况下,迭代器可能会更好)
    :return:None
    """
    for i in elements:
        que.put(i)
        print("put element:", i)


def main(threads_nums=3, queue_max=5, time_wait=5, *args, **kwargs) -> None:
    """
    主程序
    :param threads_nums:线程池线程数
    :param queue_max:元素队列最大长度
    :param time_wait:如果队列满了,等待 time_wait 秒后,再分发任务.
                    time_wait 一般设置为一个队列元素相对应的 task 时间
                    避免一次性把所有任务都分发给线程池,让大部分任务都在内存中处于等待状态
    :param args: else you want
    :param kwargs: else you want
    :return: None
    """

    queue_element = Queue(maxsize=queue_max)  # 创建一个限制长度的队列
    elements = [i for i in range(10)]  # 生成队列元素
    t_put_queue = threading.Thread(target=thread_put_element,  # 单独建一个线程用于将数据元素放入队列
                                   args=(queue_element, elements),
                                   daemon=True
                                   )  # daemon=True 设置守护线程,主线程结束,此线程也一起结束
    t_put_queue.start()

    ALL_ELEMENTS_NUM = len(elements)  # 保存所有队列元素个数,用于退出主线程
    task_num = 0  # 任务计数

    # 创建一个包含 threads_nums 条线程的线程池
    with ThreadPoolExecutor(max_workers=threads_nums) as pool:
        while True:
            if task_num >= ALL_ELEMENTS_NUM:  # 如果所有 task 都分发完了,退出主线程循环
                break
            task_num += 1
            if queue_element.full():  # 如果队列是满的,等待 time_wait 秒
                print("queue is crowed,wait for {} secs...".format(time_wait))
                time.sleep(time_wait)
            element = queue_element.get()  # 从队列获取元素,submit给线程池执行
            print("[{}th] [submit-{}]:".format(task_num, element))
            pool.submit(thread_task, element)  # 任务submit


if __name__ == '__main__':
    main()

举报

相关推荐

0 条评论