文章目录
1.定义
asyncio
是Python 3.4版本引入的标准库,直接内置了对异步IO的支持
2.创建协程
from collections.abc import Coroutine
async def hello(name):
print('Hello,', name)
if __name__ == '__main__':
# 生成协程对象,并不会运行函数内的代码
coroutine = hello("World")
# 检查是否是协程 Coroutine 类型
print(isinstance(coroutine, Coroutine)) # True
3.常见概念
名称 | 含义 |
---|---|
event_loop 事件循环 | 程序开启一个无限的循环,程序员会把一些函数(协程)注册到事件循环上。当满足事件发生的时候,调用相应的协程函数 |
coroutine 协程 | 协程对象,指一个使用async关键字定义的函数,它的调用不会立即执行函数,而是会返回一个协程对象。协程对象需要注册到事件循环,由事件循环调用 |
future 对象 | 代表将来执行或没有执行的任务的结果。它和task上没有本质的区别 |
task 任务 | 个协程对象就是一个原生可以挂起的函数,任务则是对协程进一步封装,其中包含任务的各种状态。Task 对象是 Future 的子类,它将 coroutine 和 Future 联系在一起,将 coroutine 封装成一个 Future 对象 |
async/await 关键字 | python3.5 用于定义协程的关键字,async定义一个协程,await用于挂起阻塞的异步调用接口,其作用在一定程度上类似于yield |
4.基本使用
import asyncio
async def hello(name):
print('Hello,', name)
# 定义协程对象
coroutine = hello("World")
# 定义事件循环对象容器
loop = asyncio.get_event_loop()
# task = asyncio.ensure_future(coroutine)
# 将协程转为task任务
task = loop.create_task(coroutine)
# 将task任务扔进事件循环对象中并触发
loop.run_until_complete(task)
输出
Hello, World
5.绑定回调函数
import time
import asyncio
async def _sleep(x):
time.sleep(2)
return '暂停了{}秒!'.format(x)
def callback(future):
print('这里是回调函数,获取返回结果是:', future.result())
coroutine = _sleep(2)
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(coroutine)
# 添加回调函数
task.add_done_callback(callback)
loop.run_until_complete(task)
输出
这里是回调函数,获取返回结果是: 暂停了2秒!
6.协程中的并发
import asyncio
async def do_work(x):
print("waiting", x)
await asyncio.sleep(x)
return f"done after {x}s "
loop = asyncio.get_event_loop()
tasks = [asyncio.ensure_future(do_work(1)),
asyncio.ensure_future(do_work(2)),
]
loop.run_until_complete(asyncio.gather(*tasks))
# loop.run_until_complete(asyncio.wait(tasks))
for task in tasks:
print("Task rest", task.result())
7.协程中的嵌套
import asyncio
async def do_work(x):
print("waiting", x)
await asyncio.sleep(x)
return f"done after {x}s "
async def main():
tasks = [asyncio.ensure_future(do_work(1)),
asyncio.ensure_future(do_work(2)),
]
done, pending = await asyncio.wait(tasks)
for task in done:
print("Task rest", task.result())
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
8.gather和wait的区别
方面 | wait | gather |
---|---|---|
接受的参数 | coroutine和task对象 | coroutine和task对象 |
返回结果 | dones(已完成的任务),pending(未完成的任务) | dones(已完成的任务) |
控制功能 | reuturn_when,timeout |
9.动态添加协程
import time
import asyncio
from queue import Queue
from threading import Thread
async def do_work(x, queue: Queue, msg=""):
await asyncio.sleep(x)
queue.put(msg)
def start_loop(loop: asyncio.ProactorEventLoop):
asyncio.set_event_loop(loop)
loop.run_forever()
if __name__ == '__main__':
queue = Queue()
loop = asyncio.new_event_loop()
my_thread = Thread(target=start_loop, args=(loop,))
my_thread.start()
#不能加join否者不能执行
print(time.ctime())
asyncio.run_coroutine_threadsafe(do_work(1, queue, "第一个"), loop)
asyncio.run_coroutine_threadsafe(do_work(2, queue, "第二个"), loop)
while True:
msg = queue.get()
print(f"{msg} 协程运行完成...")
print(time.ctime())
10.redis实现动态任务添加
import time
import redis
import asyncio
from queue import Queue
from threading import Thread
def start_loop(loop):
# 一个在后台永远运行的事件循环
asyncio.set_event_loop(loop)
loop.run_forever()
async def do_sleep(x, queue):
await asyncio.sleep(x)
queue.put("ok")
def get_redis():
connection_pool = redis.ConnectionPool(host='127.0.0.1', db=0)
return redis.Redis(connection_pool=connection_pool)
def consumer():
while True:
task = rcon.rpop("queue")
if not task:
time.sleep(1)
continue
asyncio.run_coroutine_threadsafe(do_sleep(int(task), queue), new_loop)
if __name__ == '__main__':
print(time.ctime())
new_loop = asyncio.new_event_loop()
# 定义一个线程,运行一个事件循环对象,用于实时接收新任务
loop_thread = Thread(target=start_loop, args=(new_loop,))
loop_thread.setDaemon(True)
loop_thread.start()
# 创建redis连接
rcon = get_redis()
queue = Queue()
# 子线程:用于消费队列消息,并实时往事件对象容器中添加新任务
consumer_thread = Thread(target=consumer)
consumer_thread.setDaemon(True)
consumer_thread.start()
while True:
msg = queue.get()
print("协程运行完..")
print("当前时间:", time.ctime())