0
点赞
收藏
分享

微信扫一扫

Python实现协程(五)

楚木巽 2021-09-29 阅读 85

一. asyncio 模块基本操作

1.1 任务状态

上一节我们提到 asyncio 的任务使用协程对象作为参数创建。并且任务含有多种状态。下面我们使用一个简单的例子来说明任务的各种状态。

import time
import asyncio
 
 
@asyncio.coroutine
def do_some_work():
    print('Coroutine start.')
    time.sleep(3)
    print('Coroutine finished.')
 
 
def main():
    start = time.time()
 
    loop = asyncio.get_event_loop()
    coroutine = do_some_work()
    task = loop.create_task(coroutine)  # 创建任务
    print('task is instance of asyncio.Task?', 'yes' if isinstance(task, asyncio.Task) else 'No')
    print(f'task state {task._state}')
    loop.run_until_complete(task)
    print(f'task state {task._state}')
 
    end = time.time()
    print(f'运行耗时: {end-start:.2f}')
 
 
if __name__ == '__main__':
    main()

运行结果:事件循环的 create_task 方法可以创建任务,另外
asyncio.ensure_future 方法也可以创建任务,参数须为协程对象。

taskasyncio.Task 类的实例,创建 task 可以方便预激协程以及处理协程运行中遇到的异常。task 对象的 _state 属性保存当前任务的运行状态,任务的运行状态有 PENDINGFINISHED 两种。

1.2 async / await关键字

Python3.5 新增的 asyncawait 关键字可以用来定义协程函数。这两个关键字是一个组合,其作用等同于 @asyncio.coroutine 装饰器和 yield from 语句。以便将协程函数和生成器函数在语法上做出明显的区分。

import time
import asyncio


async def sleep_3s():
    time.sleep(3)


async def do_some_work():
    print('Coroutine start.')
    # 通过await语法来挂起自身的协程,并等待另一个协程完成直到返回结果
    await sleep_3s()
    print('Coroutine finished.')


def main():
    start = time.time()
    loop = asyncio.get_event_loop()
    coroutine = do_some_work()
    # 创建任务
    task = loop.create_task(coroutine)

    print('task is instance of asyncio.Task?', 'yes' if isinstance(task, asyncio.Task) else 'No')
    print(f'task state {task._state}')
    loop.run_until_complete(task)
    print(f'task state {task._state}')

    end = time.time()
    print(f'运行耗时: {end - start:.2f}')


if __name__ == '__main__':
    main()

1.3 绑定回调

假设协程包含一个 IO 操作(这几乎是肯定的),等它处理完数据后,我们希望得到通知,以便下一步数据处理。

这一需求可以通过向 future 对象添加回调实现。那什么是 future 对象呢?task 对象就是 future 对象,因为 asyncio.Taskasyncio.Future 的子类。

因此 task 对象也可以添加回调函数。回调函数的最后一个参数是 futuretask 对象,通过该对象可以获取协程返回值。如果回调需要多个参数,可以使用 functools.partial 偏导函数传入。

import asyncio
import time
from functools import partial
 
 
async def coro_work():
    print('coro_work -> Coroutine start.')
    time.sleep(3)
    print('coro-work -> Coroutine finished.')
 
 
def callback(name, task):
    print(f'callback -> {task._state}')
    print(f'callback -> {name}')
 
 
def main():
    start = time.time()
    loop = asyncio.get_event_loop()
    coroutine = coro_work()
    task = loop.create_task(coroutine)
    task.add_done_callback(partial(callback, 'Coroutine, Bye Bye~'))
    loop.run_until_complete(task)
    end = time.time()
    print(f'运行耗时:{end - start:.2f}')
 
 
if __name__ == '__main__':
    main()

运行结果:使用 async 关键字替代 asyncio.coroutine 装饰器创建协程函数。callback 为回调函数,协程终止后需要运行的代码写入回调函数,回调函数的参数有要求,最后一个位置参数须为 task 对象。

task 对象的 add_done_callback 方法可以添加回调函数,注意参数必须是回调函数,这个方法不能传入回调函数的参数,这一点需要通过 functools 模块的 partial 方法解决,将回调函数和其参数 name 作为 partial 方法的参数,此方法的返回值就是偏函数,偏函数可作为 task.add_done_callback 方法的参数。

二. 协程处理多任务

开始介绍 asyncio 模块到现在,我们还没有使用协程处理多任务。在实际项目中,往往有多个协程对象,并创建多个任务,同时在一个 loop 里运行。为了把多个协程交给 loop ,需要借助 asyncio.gather 方法。任务的 result 方法可以获得对应协程函数的 return 值。

import asyncio
import time


async def coro_work(name, t):
    print(f'[coro_work] Coroutine {name} start.')
    await asyncio.sleep(t)
    print(f'[coro_work] Coroutine {name} finished.')
    return f'Coroutine {name} OK.'


def main():
    start = time.time()
    loop = asyncio.get_event_loop()

    coroutine1 = coro_work('ONE', 3)
    coroutine2 = coro_work('TWO', 1)

    task1 = loop.create_task(coroutine1)
    task2 = asyncio.ensure_future(coroutine2)

    gather = asyncio.gather(task1, task2)
    loop.run_until_complete(gather)

    print(f'[task1 result] {task1.result()}')
    print(f'[task2 result] {task2.result()}')

    end = time.time()
    print(f'运行耗时:{end - start:.4f}')


if __name__ == '__main__':
    main()

await 关键字等同于 Python3.4 中的 yield from 语句,后面接协程对象。asyncio.sleep 方法的返回值为协程对象,此处为阻塞运行。

协程函数的 return 值在协程运行结束后通过调用对应 task 对象的 result 方法返回。

asyncio.gather 方法接收多个 task 作为参数,创建任务搜集器。

run_until_complete 方法也可接收任务搜集器作为参数,并阻塞运行,直到全部任务完成。任务结束后,事件循环终止,打印任务的 result 方法返回值,即协程函数的 return 值。

运行结果:在事件循环内部,2 个协程是交替运行完成的:首先运行 task1 ,打印 [coro_work] Coroutine ONE start.task1 运行到 asyncio.sleep 阻塞,让步 CPU 的使用权给 task2 执行,打印 [coro_work] Coroutine TWO start.task2 运行到 asyncio.sleep 阻塞,再次让步 CPU 的使用权,但此刻事件循环发现所有协程都处于阻塞状态,只能等待阻塞结束。

task2 的阻塞时间较短,阻塞 1s 后结束,打印 [coro_work] Coroutine TWO finished.;又过了 2s,阻塞 3s 的 task1 也结束了,打印 [coro_work] Coroutine ONE finished.

至此,2 个任务全部完成,事件循环停止,打印 task1 和 task2 的返回值,任务总耗时约 3s,如果使用单线程同步模型则至少 4s 。

注意:

  • 多数情况下无需调用 taskadd_done_callback 方法,可以直接把回调函数中的代码写入 await 语句后面,协程是可以暂停和恢复的。
  • 多数情况下同样不需要调用 taskresult 方法获取协程函数的 return 值,因为事件循环的 run_until_complete 方法的返回值就是协程函数的返回值。
  • 事件循环有一个 stop 方法来停止循环和一个 close 方法来关闭循环。以上示例均没有调用 loop.close 方法,似乎并没有什么问题,那调用 loop.close 是否是必须的呢?
    简言之,loop 只要不关闭,就可以再次运行 run_until_complete() 方法,关闭后则不可运行。有人建议调用 loop.close ,以彻底清理 loop 对象防止误用,其实多数情况下并无必要。
  • asyncio 提供了 asyncio.gatherasyncio.wait 两个任务搜集方法,它们的作用相同,都是将协程任务按顺序排定,再将返回值作为参数加入到事件循环中。
    二者的主要区别在于:asyncio.wait 可以获取任务的执行状态PENDING FINISHED。当有一些特殊需求,比如某些情况下取消任务,可以使用 asyncio.wait 搜集器。

三. 取消任务

在事件循环启动之后,停止之前,我们可以手动取消任务的执行,但注意只有 PENDING 状态的任务才允许取消,FINISHED 状态的任务已经完成,自然无法取消。

import asyncio


async def work(id, t):
    print('Working...')
    await asyncio.sleep(t)
    print(f'Work {id} done.')


def main():
    loop = asyncio.get_event_loop()
    coroutines = [work(i, i) for i in range(1, 4)]
    try:
        loop.run_until_complete(asyncio.gather(*coroutines))
    except KeyboardInterrupt:
        loop.stop()  # 取消所有未完成的任务,停止事件循环
    finally:
        loop.close()  # 关闭事件循环


if __name__ == '__main__':
    main()

运行结果:程序运行过程中,按 Ctrl + C 会触发 KeyboardInterrupt 异常。捕获这个异常,将取消所有未完成的任务。

除了使用事件循环的 stop 方法取消所有未完成的任务,还可以直接调用任务的 cancel 方法,而 asyncio.Task.all_tasks 方法可以获得事件循环中的全部任务。

下面,我们修改上述实例的 main() 函数代码:

import asyncio
 
async def work(id, t):
    print('Working...')
    await asyncio.sleep(t)
    print(f'Work {id} done.')
 
def main():
    loop = asyncio.get_event_loop()
    coroutines = [work(i, i) for i in range(1, 4)]
    try:
        loop.run_until_complete(asyncio.gather(*coroutines))
    except KeyboardInterrupt:
        # loop.stop()  # 取消所有未完成的任务,停止事件循环
        print()
        tasks = asyncio.Task.all_tasks()
        for task in tasks:
            print(f'正在取消任务:{task}')
            print(f'任务取消:{task.cancel()}')
    finally:
        loop.close()  # 关闭事件循环
 
if __name__ == '__main__':
    main()

运行结果: 程序运行到 work 1 done 输出时,按下 Ctrl + C 会触发 KeyboardInterrupt 异常。asyncio.Task.all_tasks() 可以捕获事件循环中的所有任务的集合,任务状态有 PENDINGFINISHED 两者。任务的 cancel 方法可以取消未完成的任务,取消成功返回 True ,已完成的任务由于取消失败返回 False

举报

相关推荐

0 条评论