0
点赞
收藏
分享

微信扫一扫

Python的异步asynico

kolibreath 2022-01-15 阅读 49

文章目录

1.定义

asyncio是Python 3.4版本引入的标准库,直接内置了对异步IO的支持

2.创建协程

from collections.abc import Coroutine

async def hello(name):
    print('Hello,', name)

if __name__ == '__main__':
    # 生成协程对象,并不会运行函数内的代码
    coroutine = hello("World")

    # 检查是否是协程 Coroutine 类型
    print(isinstance(coroutine, Coroutine))  # True

3.常见概念

名称含义
event_loop 事件循环程序开启一个无限的循环,程序员会把一些函数(协程)注册到事件循环上。当满足事件发生的时候,调用相应的协程函数
coroutine 协程协程对象,指一个使用async关键字定义的函数,它的调用不会立即执行函数,而是会返回一个协程对象。协程对象需要注册到事件循环,由事件循环调用
future 对象代表将来执行或没有执行的任务的结果。它和task上没有本质的区别
task 任务个协程对象就是一个原生可以挂起的函数,任务则是对协程进一步封装,其中包含任务的各种状态。Task 对象是 Future 的子类,它将 coroutine 和 Future 联系在一起,将 coroutine 封装成一个 Future 对象
async/await 关键字python3.5 用于定义协程的关键字,async定义一个协程,await用于挂起阻塞的异步调用接口,其作用在一定程度上类似于yield

4.基本使用

import asyncio

async def hello(name):
    print('Hello,', name)

# 定义协程对象
coroutine = hello("World")

# 定义事件循环对象容器
loop = asyncio.get_event_loop()
# task = asyncio.ensure_future(coroutine)

# 将协程转为task任务
task = loop.create_task(coroutine)

# 将task任务扔进事件循环对象中并触发
loop.run_until_complete(task)

输出

Hello, World

5.绑定回调函数

import time
import asyncio


async def _sleep(x):
    time.sleep(2)
    return '暂停了{}秒!'.format(x)

def callback(future):
    print('这里是回调函数,获取返回结果是:', future.result())

coroutine = _sleep(2)
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(coroutine)

# 添加回调函数
task.add_done_callback(callback)

loop.run_until_complete(task)

输出

这里是回调函数,获取返回结果是: 暂停了2秒!

6.协程中的并发

import asyncio


async def do_work(x):
    print("waiting", x)
    await asyncio.sleep(x)
    return f"done after {x}s "


loop = asyncio.get_event_loop()
tasks = [asyncio.ensure_future(do_work(1)),
         asyncio.ensure_future(do_work(2)),
         ]
loop.run_until_complete(asyncio.gather(*tasks))
# loop.run_until_complete(asyncio.wait(tasks))
for task in tasks:
    print("Task rest", task.result())

7.协程中的嵌套

import asyncio


async def do_work(x):
    print("waiting", x)
    await asyncio.sleep(x)
    return f"done after {x}s "


async def main():
    tasks = [asyncio.ensure_future(do_work(1)),
             asyncio.ensure_future(do_work(2)),
             ]
    done, pending = await asyncio.wait(tasks)
    for task in done:
        print("Task rest", task.result())


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

8.gather和wait的区别

方面waitgather
接受的参数coroutine和task对象coroutine和task对象
返回结果dones(已完成的任务),pending(未完成的任务)dones(已完成的任务)
控制功能reuturn_when,timeout

9.动态添加协程

import time
import asyncio
from queue import Queue
from threading import Thread


async def do_work(x, queue: Queue, msg=""):
    await asyncio.sleep(x)
    queue.put(msg)


def start_loop(loop: asyncio.ProactorEventLoop):
    asyncio.set_event_loop(loop)
    loop.run_forever()


if __name__ == '__main__':
    queue = Queue()
    loop = asyncio.new_event_loop()

    my_thread = Thread(target=start_loop, args=(loop,))
    my_thread.start()
	#不能加join否者不能执行
    print(time.ctime())

    asyncio.run_coroutine_threadsafe(do_work(1, queue, "第一个"), loop)
    asyncio.run_coroutine_threadsafe(do_work(2, queue, "第二个"), loop)
    
    while True:
        msg = queue.get()
        print(f"{msg} 协程运行完成...")
        print(time.ctime())

10.redis实现动态任务添加

import time
import redis
import asyncio
from queue import Queue
from threading import Thread

def start_loop(loop):
    # 一个在后台永远运行的事件循环
    asyncio.set_event_loop(loop)
    loop.run_forever()

async def do_sleep(x, queue):
    await asyncio.sleep(x)
    queue.put("ok")

def get_redis():
    connection_pool = redis.ConnectionPool(host='127.0.0.1', db=0)
    return redis.Redis(connection_pool=connection_pool)

def consumer():
    while True:
        task = rcon.rpop("queue")
        if not task:
            time.sleep(1)
            continue
        asyncio.run_coroutine_threadsafe(do_sleep(int(task), queue), new_loop)


if __name__ == '__main__':
    print(time.ctime())
    new_loop = asyncio.new_event_loop()

    # 定义一个线程,运行一个事件循环对象,用于实时接收新任务
    loop_thread = Thread(target=start_loop, args=(new_loop,))
    loop_thread.setDaemon(True)
    loop_thread.start()
    # 创建redis连接
    rcon = get_redis()

    queue = Queue()

    # 子线程:用于消费队列消息,并实时往事件对象容器中添加新任务
    consumer_thread = Thread(target=consumer)
    consumer_thread.setDaemon(True)
    consumer_thread.start()

    while True:
        msg = queue.get()
        print("协程运行完..")
        print("当前时间:", time.ctime())
举报

相关推荐

0 条评论