您现在的位置是:首页 >技术杂谈 >Python:asyncio.wait 和 asyncio.gather 总结网站首页技术杂谈
Python:asyncio.wait 和 asyncio.gather 总结
0.协程理解:
Future
future是一个数据结构,表示还未完成的工作结果。事件循环可以监视Future对象是否完成。从而允许应用的一部分等待另一部分完成一些工作。Future
获取Futrue里的结果
future表示还没有完成的工作结果。事件循环可以通过监视一个future对象的状态来指示它已经完成。future对象有几个状态:
Pending
Running
Done
Cancelled
创建future的时候,task为pending,事件循环调用执行的时候当然就是running,调用完毕自然就是done,如果需要停止事件循环,就需要先把task取消,状态为cancel。
Task
task是Future的一个子类,它知道如何包装和管理一个协程的执行。任务所需的资源
可用时,事件循环会调度任务允许,并生成一个结果,从而可以由其他协程消费。
event_loop 事件循环:程序开启一个无限的循环,程序员会把一些函数注册到事件循环上。当满足事件发生的时候,调用相应的协程函数。
coroutine 协程:协程对象,指一个使用async关键字定义的函数,它的调用不会立即执行函数,而是会返回一个协程对象。协程对象需要注册到事件循环,由事件循环调用。
task 任务:一个协程对象就是一个原生可以挂起的函数,任务则是对协程进一步封装,其中包含任务的各种状态。
async/await 关键字:python3.5 用于定义协程的关键字,async定义一个协程,await用于挂起阻塞的异步调用接口。
原文链接:python并发运行协程 asyncio.gather 和 asyncio.wait_林胖胖哒的博客-CSDN博客
Python协程及asyncio基础知识
协程(coroutine)也叫微线程,是实现多任务的另一种方式,是比线程更小的执行单元,一般运行在单进程和单线程上。因为它自带CPU的上下文,它可以通过简单的事件循环切换任务,比进程和线程的切换效率更高,这是因为进程和线程的切换由操作系统进行。
Python实现协程的主要借助于两个库:asyncio和gevent。由于asyncio已经成为python的标准库了无需pip安装即可使用,这意味着asyncio作为Python原生的协程实现方式会更加流行。本文仅会介绍asyncio模块。如果大家对gevent也有需求,请留言,我会单独写篇文章介绍这个库的使用。
asyncio 是从Python3.4引入的标准库,直接内置了对协程异步IO的支持。asyncio 的编程模型本质是一个消息循环,我们一般先定义一个协程函数(或任务), 从 asyncio 模块中获取事件循环loop,然后把需要执行的协程任务(或任务列表)扔到 loop中执行,就实现了异步IO。
定义协程函数及执行方法的演变
在最早的Python 3.4中,协程函数是通过@asyncio.coroutine 和 yeild from 实现的, 如下所示。
import asyncio
@asyncio.coroutine
def func1(i):
print("协程函数{}马上开始执行。".format(i))
yield from asyncio.sleep(2)
print("协程函数{}执行完毕!".format(i))
if __name__ == '__main__':
# 获取事件循环
loop = asyncio.get_event_loop()
# 执行协程任务
loop.run_until_complete(func1(1))
# 关闭事件循环
loop.close()
这里我们定义了一个func1的协程函数,我们可以使用asyncio.iscoroutinefunction来验证。定义好协程函数后,我们首先获取事件循环loop,使用它的run_until_complete方法执行协程任务,然后关闭loop。
print(asyncio.iscoroutinefunction(func1(1))) # True
Python 3.5以后引入了async/await 语法定义协程函数,代码如下所示。每个协程函数都以async声明,以区别于普通函数,对于耗时的代码或函数我们使用await声明,表示碰到等待时挂起,以切换到其它任务。
import asyncio
# 这是一个协程函数
async def func1(i):
print("协程函数{}马上开始执行。".format(i))
await asyncio.sleep(2)
print("协程函数{}执行完毕!".format(i))
if __name__ == '__main__':
# 获取事件循环
loop = asyncio.get_event_loop()
# 执行协程任务
loop.run_until_complete(func1(1))
# 关闭事件循环
loop.close()
Python 3.7之前执行协程任务都是分三步进行的,代码有点冗余。Python 3.7提供了一个更简便的asyncio.run方法,上面代码可以简化为:
import asyncio
async def func1(i):
print(f"协程函数{i}马上开始执行。")
await asyncio.sleep(2)
print(f"协程函数{i}执行完毕!")
if __name__ == '__main__':
asyncio.run(func1(1))
注:Python自3.6版本起可以使用f-string来对字符串进行格式化了,相当于format函数的简化版。
创建协程任务的演变
前面的演示案例中,我们只执行了单个协程任务(函数)。实际应用中,我们先由协程函数创建协程任务,然后把它们加入协程任务列表,最后一起交由事件循环执行。
根据协程函数创建协程任务有多种方法,其中最新的是Python 3.7版本提供的asyncio.create_task方法,如下所示:
# 方法1:使用ensure_future方法。future代表一个对象,未执行的任务。
task1 = asyncio.ensure_future(func1(1))
task2 = asyncio.ensure_future(func1(2))
# 方法2:使用loop.create_task方法
task1 = loop.create_task(func1(1))
task2 = loop.create_task(func1(2))
# 方法3:使用Python 3.7提供的asyncio.create_task方法
task1 = asyncio.create_task(func1(1))
task2 = asyncio.create_task(func1(2))
1. asyncio.wait 和 asyncio.gather 的异同点综述
- 相同
:从功能上看,asyncio.wait 和 asyncio.gather 实现的效果是相同的,都是把所有 Task 任务结果收集起来。
- 不同
:asyncio.wait 使用一个set保存它创建的Task实例,因为set是无序的所以这也就是我们的任务不是顺序执行的原因。会返回两个值:done 和 pending,done 为已完成的协程 Task,pending 为超时未完成的协程 Task,需通过 future.result 调用 Task 的 result;
而asyncio.gather 返回的是所有已完成 Task 的 result,不需要再进行调用或其他操作,就可以得到全部结果,如果列表中传入的不是create_task方法创建的协程任务,它会自动将函数封装成协程任务
2. asyncio.wait 用法:
最常见的写法是:await asyncio.wait(task_list)。
import asyncio
import arrow
def current_time():
'''
获取当前时间
:return:
'''
cur_time = arrow.now().to('Asia/Shanghai').format('YYYY-MM-DD HH:mm:ss')
return cur_time
async def func(sleep_time):
func_name_suffix = sleep_time # 使用 sleep_time(函数 I/O 等待时长)作为函数名后缀,以区分任务对象
print(f"[{current_time()}] 执行异步函数 {func.__name__}-{func_name_suffix}")
await asyncio.sleep(sleep_time)
print(f"[{current_time()}] 函数 {func.__name__}-{func_name_suffix} 执行完毕")
return f"【[{current_time()}] 得到函数 {func.__name__}-{func_name_suffix} 执行结果】"
async def run():
task_list = []
for i in range(5):
task = asyncio.create_task(func(i))
task_list.append(task)
#会返回两个值:done 和 pending,done 为已完成的协程 Task,pending 为超时未完成的协程 Task
done, pending = await asyncio.wait(task_list, timeout=None)
for done_task in done:
print((f"[{current_time()}] 得到执行结果 {done_task.result()}"))
def main():
loop = asyncio.get_event_loop()
loop.run_until_complete(run())
if __name__ == '__main__':
main()
代码执行结果如下:
[2020-11-03 22:45:53] 执行异步函数 func-0
[2020-11-03 22:45:53] 执行异步函数 func-1
[2020-11-03 22:45:53] 执行异步函数 func-2
[2020-11-03 22:45:53] 执行异步函数 func-3
[2020-11-03 22:45:53] 执行异步函数 func-4
[2020-11-03 22:45:53] 函数 func-0 执行完毕
[2020-11-03 22:45:54] 函数 func-1 执行完毕
[2020-11-03 22:45:55] 函数 func-2 执行完毕
[2020-11-03 22:45:56] 函数 func-3 执行完毕
[2020-11-03 22:45:57] 函数 func-4 执行完毕
[2020-11-03 22:45:57] 得到执行结果 【[2020-11-03 22:45:57] 得到函数 func-4 执行结果】
[2020-11-03 22:45:57] 得到执行结果 【[2020-11-03 22:45:55] 得到函数 func-2 执行结果】
[2020-11-03 22:45:57] 得到执行结果 【[2020-11-03 22:45:53] 得到函数 func-0 执行结果】
[2020-11-03 22:45:57] 得到执行结果 【[2020-11-03 22:45:56] 得到函数 func-3 执行结果】
[2020-11-03 22:45:57] 得到执行结果 【[2020-11-03 22:45:54] 得到函数 func-1 执行结果】
3. asyncio.gather 用法:
最常见的用法是:await asyncio.gather(*task_list),注意这里 task_list 前面有一个 *(把list解构)。
import asyncio
import arrow
def current_time():
'''
获取当前时间
:return:
'''
cur_time = arrow.now().to('Asia/Shanghai').format('YYYY-MM-DD HH:mm:ss')
return cur_time
async def func(sleep_time):
func_name_suffix = sleep_time # 使用 sleep_time(函数 I/O 等待时长)作为函数名后缀,以区分任务对象
print(f"[{current_time()}] 执行异步函数 {func.__name__}-{func_name_suffix}")
await asyncio.sleep(sleep_time)
print(f"[{current_time()}] 函数 {func.__name__}-{func_name_suffix} 执行完毕")
return f"【[{current_time()}] 得到函数 {func.__name__}-{func_name_suffix} 执行结果】"
async def run():
task_list = []
for i in range(5):
task = asyncio.create_task(func(i))
task_list.append(task)
results = await asyncio.gather(*task_list)
for result in results:
print((f"[{current_time()}] 得到执行结果 {result}"))
def main():
loop = asyncio.get_event_loop()
loop.run_until_complete(run())
if __name__ == '__main__':
main()
代码执行结果如下:
[2020-11-03 22:35:54] 执行异步函数 func-0
[2020-11-03 22:35:54] 执行异步函数 func-1
[2020-11-03 22:35:54] 执行异步函数 func-2
[2020-11-03 22:35:54] 执行异步函数 func-3
[2020-11-03 22:35:54] 执行异步函数 func-4
[2020-11-03 22:35:54] 函数 func-0 执行完毕
[2020-11-03 22:35:55] 函数 func-1 执行完毕
[2020-11-03 22:35:56] 函数 func-2 执行完毕
[2020-11-03 22:35:57] 函数 func-3 执行完毕
[2020-11-03 22:35:58] 函数 func-4 执行完毕
[2020-11-03 22:35:58] 得到执行结果 【[2020-11-03 22:35:54] 得到函数 func-0 执行结果】
[2020-11-03 22:35:58] 得到执行结果 【[2020-11-03 22:35:55] 得到函数 func-1 执行结果】
[2020-11-03 22:35:58] 得到执行结果 【[2020-11-03 22:35:56] 得到函数 func-2 执行结果】
[2020-11-03 22:35:58] 得到执行结果 【[2020-11-03 22:35:57] 得到函数 func-3 执行结果】
[2020-11-03 22:35:58] 得到执行结果 【[2020-11-03 22:35:58] 得到函数 func-4 执行结果】
import asyncio
"""asyncio.gather方法,它的作用asyncio.wait方法类似,但更强大。
如果列表中传入的不是create_task方法创建的协程任务,
它会自动将函数封装成协程任务,如下所示:"""
async def func1(i):
print(f"协程函数{i}马上开始执行。")
await asyncio.sleep(2)
print(f"协程函数{i}执行完毕!")
async def main():
tasks = []
for i in range(1, 5):
# 这里未由协程函数创建协程任务
tasks.append(func1(i))
# 注意这里*号。gather自动将函数列表封装成了协程任务。
await asyncio.gather(*tasks)
if __name__ == '__main__':
asyncio.run(main())
asyncio高级使用方法
给任务添加回调函数
我们还可以给每个协程任务通过add_done_callback的方法给单个协程任务添加回调函数,如下所示:
#-*- coding:utf-8 -*-
import asyncio
async def func1(i):
print(f"协程函数{i}马上开始执行。")
await asyncio.sleep(2)
return i
# 回调函数
def callback(future):
print(f"执行结果:{future.result()}")
async def main():
tasks = []
for i in range(1, 5):
task = asyncio.create_task(func1(i))
# 注意这里,增加回调函数
task.add_done_callback(callback)
tasks.append(task)
await asyncio.wait(tasks)
if __name__ == '__main__':
asyncio.run(main())
设置任务超时
很多协程任务都是很耗时的,当你使用wait方法收集协程任务时,可通过timeout选项设置任务切换前单个任务最大等待时间长度,如下所示:
# 获取任务执行结果,如下所示: done,pending = await asyncio.wait(tasks, timeout=10)
自省
- asyncio.current_task: 返回当前运行的Task实例,如果没有正在运行的任务则返回 None。如果 loop 为 None 则会使用 get_running_loop()获取当前事件循环。
- asyncio.all_tasks: 返回事件循环所运行的未完成的Task对象的集合。