一. asyncio 模块基本操作
1.1 任务状态
上一节我们提到 asyncio 的任务使用协程对象作为参数创建。并且任务含有多种状态。下面我们使用一个简单的例子来说明任务的各种状态。
import time
import asyncio
@asyncio.coroutine
def do_some_work():
print('Coroutine start.')
time.sleep(3)
print('Coroutine finished.')
def main():
start = time.time()
loop = asyncio.get_event_loop()
coroutine = do_some_work()
task = loop.create_task(coroutine) # 创建任务
print('task is instance of asyncio.Task?', 'yes' if isinstance(task, asyncio.Task) else 'No')
print(f'task state {task._state}')
loop.run_until_complete(task)
print(f'task state {task._state}')
end = time.time()
print(f'运行耗时: {end-start:.2f}')
if __name__ == '__main__':
main()
运行结果:事件循环的 create_task
方法可以创建任务,另外
asyncio.ensure_future
方法也可以创建任务,参数须为协程对象。
task
是 asyncio.Task
类的实例,创建 task
可以方便预激协程以及处理协程运行中遇到的异常。task
对象的 _state
属性保存当前任务的运行状态,任务的运行状态有 PENDING
和 FINISHED
两种。
1.2 async / await关键字
Python3.5 新增的 async
和 await
关键字可以用来定义协程函数。这两个关键字是一个组合,其作用等同于 @asyncio.coroutine
装饰器和 yield from
语句。以便将协程函数和生成器函数在语法上做出明显的区分。
import time
import asyncio
async def sleep_3s():
time.sleep(3)
async def do_some_work():
print('Coroutine start.')
# 通过await语法来挂起自身的协程,并等待另一个协程完成直到返回结果
await sleep_3s()
print('Coroutine finished.')
def main():
start = time.time()
loop = asyncio.get_event_loop()
coroutine = do_some_work()
# 创建任务
task = loop.create_task(coroutine)
print('task is instance of asyncio.Task?', 'yes' if isinstance(task, asyncio.Task) else 'No')
print(f'task state {task._state}')
loop.run_until_complete(task)
print(f'task state {task._state}')
end = time.time()
print(f'运行耗时: {end - start:.2f}')
if __name__ == '__main__':
main()
1.3 绑定回调
假设协程包含一个 IO 操作(这几乎是肯定的),等它处理完数据后,我们希望得到通知,以便下一步数据处理。
这一需求可以通过向 future
对象添加回调实现。那什么是 future
对象呢?task
对象就是 future
对象,因为 asyncio.Task
是 asyncio.Future
的子类。
因此 task
对象也可以添加回调函数。回调函数的最后一个参数是 future
或 task
对象,通过该对象可以获取协程返回值。如果回调需要多个参数,可以使用 functools.partial
偏导函数传入。
import asyncio
import time
from functools import partial
async def coro_work():
print('coro_work -> Coroutine start.')
time.sleep(3)
print('coro-work -> Coroutine finished.')
def callback(name, task):
print(f'callback -> {task._state}')
print(f'callback -> {name}')
def main():
start = time.time()
loop = asyncio.get_event_loop()
coroutine = coro_work()
task = loop.create_task(coroutine)
task.add_done_callback(partial(callback, 'Coroutine, Bye Bye~'))
loop.run_until_complete(task)
end = time.time()
print(f'运行耗时:{end - start:.2f}')
if __name__ == '__main__':
main()
运行结果:使用 async
关键字替代 asyncio.coroutine
装饰器创建协程函数。callback
为回调函数,协程终止后需要运行的代码写入回调函数,回调函数的参数有要求,最后一个位置参数须为 task
对象。
task
对象的 add_done_callback
方法可以添加回调函数,注意参数必须是回调函数,这个方法不能传入回调函数的参数,这一点需要通过 functools
模块的 partial
方法解决,将回调函数和其参数 name
作为 partial
方法的参数,此方法的返回值就是偏函数,偏函数可作为 task.add_done_callback
方法的参数。
二. 协程处理多任务
开始介绍 asyncio
模块到现在,我们还没有使用协程处理多任务。在实际项目中,往往有多个协程对象,并创建多个任务,同时在一个 loop
里运行。为了把多个协程交给 loop
,需要借助 asyncio.gather
方法。任务的 result
方法可以获得对应协程函数的 return
值。
import asyncio
import time
async def coro_work(name, t):
print(f'[coro_work] Coroutine {name} start.')
await asyncio.sleep(t)
print(f'[coro_work] Coroutine {name} finished.')
return f'Coroutine {name} OK.'
def main():
start = time.time()
loop = asyncio.get_event_loop()
coroutine1 = coro_work('ONE', 3)
coroutine2 = coro_work('TWO', 1)
task1 = loop.create_task(coroutine1)
task2 = asyncio.ensure_future(coroutine2)
gather = asyncio.gather(task1, task2)
loop.run_until_complete(gather)
print(f'[task1 result] {task1.result()}')
print(f'[task2 result] {task2.result()}')
end = time.time()
print(f'运行耗时:{end - start:.4f}')
if __name__ == '__main__':
main()
await
关键字等同于 Python3.4 中的 yield from
语句,后面接协程对象。asyncio.sleep
方法的返回值为协程对象,此处为阻塞运行。
协程函数的 return
值在协程运行结束后通过调用对应 task
对象的 result
方法返回。
asyncio.gather
方法接收多个 task
作为参数,创建任务搜集器。
run_until_complete
方法也可接收任务搜集器作为参数,并阻塞运行,直到全部任务完成。任务结束后,事件循环终止,打印任务的 result
方法返回值,即协程函数的 return
值。
运行结果:在事件循环内部,2 个协程是交替运行完成的:首先运行 task1
,打印 [coro_work] Coroutine ONE start.
,task1
运行到 asyncio.sleep
阻塞,让步 CPU 的使用权给 task2
执行,打印 [coro_work] Coroutine TWO start.
,task2
运行到 asyncio.sleep
阻塞,再次让步 CPU 的使用权,但此刻事件循环发现所有协程都处于阻塞状态,只能等待阻塞结束。
task2
的阻塞时间较短,阻塞 1s 后结束,打印 [coro_work] Coroutine TWO finished.
;又过了 2s,阻塞 3s 的 task1 也结束了,打印 [coro_work] Coroutine ONE finished.
。
至此,2 个任务全部完成,事件循环停止,打印 task1 和 task2 的返回值,任务总耗时约 3s,如果使用单线程同步模型则至少 4s 。
注意:
- 多数情况下无需调用
task
的add_done_callback
方法,可以直接把回调函数中的代码写入await
语句后面,协程是可以暂停和恢复的。 - 多数情况下同样不需要调用
task
的result
方法获取协程函数的return
值,因为事件循环的run_until_complete
方法的返回值就是协程函数的返回值。 - 事件循环有一个
stop
方法来停止循环和一个close
方法来关闭循环。以上示例均没有调用loop.close
方法,似乎并没有什么问题,那调用loop.close
是否是必须的呢?
简言之,loop
只要不关闭,就可以再次运行run_until_complete()
方法,关闭后则不可运行。有人建议调用loop.close
,以彻底清理loop
对象防止误用,其实多数情况下并无必要。 -
asyncio
提供了asyncio.gather
和asyncio.wait
两个任务搜集方法,它们的作用相同,都是将协程任务按顺序排定,再将返回值作为参数加入到事件循环中。
二者的主要区别在于:asyncio.wait
可以获取任务的执行状态PENDING
FINISHED
。当有一些特殊需求,比如某些情况下取消任务,可以使用asyncio.wait
搜集器。
三. 取消任务
在事件循环启动之后,停止之前,我们可以手动取消任务的执行,但注意只有 PENDING
状态的任务才允许取消,FINISHED
状态的任务已经完成,自然无法取消。
import asyncio
async def work(id, t):
print('Working...')
await asyncio.sleep(t)
print(f'Work {id} done.')
def main():
loop = asyncio.get_event_loop()
coroutines = [work(i, i) for i in range(1, 4)]
try:
loop.run_until_complete(asyncio.gather(*coroutines))
except KeyboardInterrupt:
loop.stop() # 取消所有未完成的任务,停止事件循环
finally:
loop.close() # 关闭事件循环
if __name__ == '__main__':
main()
运行结果:程序运行过程中,按 Ctrl + C
会触发 KeyboardInterrupt
异常。捕获这个异常,将取消所有未完成的任务。
除了使用事件循环的 stop
方法取消所有未完成的任务,还可以直接调用任务的 cancel
方法,而 asyncio.Task.all_tasks
方法可以获得事件循环中的全部任务。
下面,我们修改上述实例的 main()
函数代码:
import asyncio
async def work(id, t):
print('Working...')
await asyncio.sleep(t)
print(f'Work {id} done.')
def main():
loop = asyncio.get_event_loop()
coroutines = [work(i, i) for i in range(1, 4)]
try:
loop.run_until_complete(asyncio.gather(*coroutines))
except KeyboardInterrupt:
# loop.stop() # 取消所有未完成的任务,停止事件循环
print()
tasks = asyncio.Task.all_tasks()
for task in tasks:
print(f'正在取消任务:{task}')
print(f'任务取消:{task.cancel()}')
finally:
loop.close() # 关闭事件循环
if __name__ == '__main__':
main()
运行结果: 程序运行到 work 1 done
输出时,按下 Ctrl + C
会触发 KeyboardInterrupt
异常。asyncio.Task.all_tasks()
可以捕获事件循环中的所有任务的集合,任务状态有 PENDING
和 FINISHED
两者。任务的 cancel
方法可以取消未完成的任务,取消成功返回 True
,已完成的任务由于取消失败返回 False
。