我已经阅读了很多关于 Python 3.5+ 中 asyncio
/ async
/ await
的示例、博客文章、问题/答案,其中很多都很复杂,我发现最简单的可能是 this one。
它仍然使用ensure_future
,为了学习 Python 中的异步编程,我希望看到一个更简单的示例,以及执行基本异步/等待示例所需的最少工具。
问题:是否可以通过仅使用这两个关键字 + 运行异步循环的代码 + 其他 Python 代码但没有其他 {3 } 功能?
示例:像这样:
import asyncio
async def async_foo():
print("async_foo started")
await asyncio.sleep(5)
print("async_foo done")
async def main():
asyncio.ensure_future(async_foo()) # fire and forget async_foo()
print('Do some actions 1')
await asyncio.sleep(5)
print('Do some actions 2')
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
但没有 ensure_future
,仍然演示了 await / async 的工作原理。
为了回答您的问题,我将针对同一问题提供 3 种不同的解决方案。
案例1:只是普通的Python
import time
def sleep():
print(f'Time: {time.time() - start:.2f}')
time.sleep(1)
def sum(name, numbers):
total = 0
for number in numbers:
print(f'Task {name}: Computing {total}+{number}')
sleep()
total += number
print(f'Task {name}: Sum = {total}\n')
start = time.time()
tasks = [
sum("A", [1, 2]),
sum("B", [1, 2, 3]),
]
end = time.time()
print(f'Time: {end-start:.2f} sec')
输出:
Task A: Computing 0+1
Time: 0.00
Task A: Computing 1+2
Time: 1.00
Task A: Sum = 3
Task B: Computing 0+1
Time: 2.01
Task B: Computing 1+2
Time: 3.01
Task B: Computing 3+3
Time: 4.01
Task B: Sum = 6
Time: 5.02 sec
案例 2:async/await 做错了
import asyncio
import time
async def sleep():
print(f'Time: {time.time() - start:.2f}')
time.sleep(1)
async def sum(name, numbers):
total = 0
for number in numbers:
print(f'Task {name}: Computing {total}+{number}')
await sleep()
total += number
print(f'Task {name}: Sum = {total}\n')
start = time.time()
loop = asyncio.get_event_loop()
tasks = [
loop.create_task(sum("A", [1, 2])),
loop.create_task(sum("B", [1, 2, 3])),
]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
end = time.time()
print(f'Time: {end-start:.2f} sec')
输出:
Task A: Computing 0+1
Time: 0.00
Task A: Computing 1+2
Time: 1.00
Task A: Sum = 3
Task B: Computing 0+1
Time: 2.01
Task B: Computing 1+2
Time: 3.01
Task B: Computing 3+3
Time: 4.01
Task B: Sum = 6
Time: 5.01 sec
案例 3:async/await 正确完成
除了 sleep
函数外,与案例 2 相同:
async def sleep():
print(f'Time: {time.time() - start:.2f}')
await asyncio.sleep(1)
输出:
Task A: Computing 0+1
Time: 0.00
Task B: Computing 0+1
Time: 0.00
Task A: Computing 1+2
Time: 1.00
Task B: Computing 1+2
Time: 1.00
Task A: Sum = 3
Task B: Computing 3+3
Time: 2.00
Task B: Sum = 6
Time: 3.01 sec
案例 1 和案例 2 给出相同的 5 秒,而案例 3 仅 3 秒。所以正确的 async/await 会更快。
差异的原因在于 sleep
函数的实现。
# case 1
def sleep():
...
time.sleep(1)
# case 2
async def sleep():
...
time.sleep(1)
# case 3
async def sleep():
...
await asyncio.sleep(1)
在案例 1 和案例 2 中,它们是“相同的”:它们“休眠”而不允许其他人使用资源。而在情况 3 中,它允许在睡眠时访问资源。
在案例 2 中,我们将 async
添加到普通函数中。然而,事件循环会不间断地运行。为什么?因为我们没有说允许循环在哪里中断你的函数来运行另一个任务。
在案例 3 中,我们准确地告诉事件循环在哪里中断函数以运行另一个任务。具体在哪里?就在这儿!
await asyncio.sleep(1)
有关此阅读的更多信息here
2020 年 5 月 2 日更新
考虑阅读
异步编程搭便车指南
Asyncio 期货和协程
是否可以通过仅使用这两个关键字 + asyncio.get_event_loop() + run_until_complete + 其他 Python 代码但不使用其他 asyncio 函数来给出一个简单的示例来展示 async / await 的工作原理?
这样就可以编写有效的代码:
import asyncio
async def main():
print('done!')
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
但是这样就不可能证明你为什么需要 asyncio。
顺便说一句,为什么需要asyncio
,而不仅仅是纯代码?答案是 - asyncio
允许您在并行化 I/O 阻塞操作(如读取/写入网络)时获得性能优势。要编写有用的示例,您需要使用这些操作的异步实现。
请阅读 this answer 以获得更详细的说明。
更新:
好的,下面的示例使用 asyncio.sleep
来模拟 I/O 阻塞操作,而 asyncio.gather
显示了如何同时运行多个阻塞操作:
import asyncio
async def io_related(name):
print(f'{name} started')
await asyncio.sleep(1)
print(f'{name} finished')
async def main():
await asyncio.gather(
io_related('first'),
io_related('second'),
) # 1s + 1s = over 1s
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
输出:
first started
second started
first finished
second finished
[Finished in 1.2s]
请注意两个 io_related
是如何启动的,仅仅一秒钟后,两者都完成了。
asyncio.sleep
。
gather
或 ensure_future
之类的东西或类似的东西?
gather
/ensure_future
的情况下编写异步示例,它会起作用(请参阅答案中的第一个代码片段)。但是如果没有 gather
/ensure_future
,您将无法同时(并行)运行协程,并且您根本无法从使用 asyncio
中获益。换句话说,这不是强制性的,但在没有 gather
/ensure_future
的情况下使用 asyncio
是没有意义的。
gather
/ensure_future
),那么您只需等待一些第三方协程即可受益。
Python 3.7+ 现在有 a simpler API (在我看来),措辞更简单(比“ensure_future”更容易记住):您可以使用 create_task
返回一个 Task 对象(如果需要,这可以用于稍后取消任务)。
基本示例 1
import asyncio
async def hello(i):
print(f"hello {i} started")
await asyncio.sleep(4)
print(f"hello {i} done")
async def main():
task1 = asyncio.create_task(hello(1)) # returns immediately, the task is created
await asyncio.sleep(3)
task2 = asyncio.create_task(hello(2))
await task1
await task2
asyncio.run(main()) # main loop
结果:
你好 1 开始 你好 2 开始 你好 1 完成 你好 2 完成
基本示例 2
如果您需要获取这些异步函数的返回值,那么 gather
很有用。以下示例的灵感来自 documentation。
import asyncio
async def factorial(n):
f = 1
for i in range(2, n + 1):
print(f"Computing factorial({n}), currently i={i}...")
await asyncio.sleep(1)
f *= i
return f
async def main():
L = await asyncio.gather(factorial(2), factorial(3), factorial(4))
print(L) # [2, 6, 24]
asyncio.run(main())
预期输出:
计算阶乘(2),当前 i=2... 计算阶乘(3),当前 i=2... 计算阶乘(4),当前 i=2... 计算阶乘(3),当前 i=3。 .. 计算阶乘(4),当前 i=3... 计算阶乘(4),当前 i=4... [2, 6, 24]
PS:即使你使用 asyncio
,而不是 trio
,the tutorial of the latter 对我理解 Python 异步编程很有帮助。
async
的示例。没有异步播放睡眠时间无关紧要,而使用异步则可以。所以有并发竞赛..
既然一切都很好地解释了,那么让我们运行一些带有事件循环的示例,将同步代码与异步代码进行比较。
同步代码:
import time
def count():
time.sleep(1)
print('1')
time.sleep(1)
print('2')
time.sleep(1)
print('3')
def main():
for i in range(3):
count()
if __name__ == "__main__":
t = time.perf_counter()
main()
t2 = time.perf_counter()
print(f'Total time elapsed: {t2:0.2f} seconds')
输出:
1
2
3
1
2
3
1
2
3
Total time elapsed: 9.00 seconds
我们可以看到,在下一个周期开始之前,每个计数周期都运行到完成。
异步代码:
import asyncio
import time
async def count():
await asyncio.sleep(1)
print('1')
await asyncio.sleep(1)
print('2')
await asyncio.sleep(1)
print('3')
async def main():
await asyncio.gather(count(), count(), count())
if __name__ == "__main__":
t = time.perf_counter()
asyncio.run(main())
t2 = time.perf_counter()
print(f'Total time elapsed: {t2:0.2f} seconds')
输出:
1
1
1
2
2
2
3
3
3
Total time elapsed: 3.00 seconds
另一方面,异步等效项看起来像这样运行需要 3 秒而不是 9 秒。第一个计数周期开始,一旦它进入 await
睡眠,Python 就可以自由地做其他工作,例如开始第二个计数周期和随后的第三个计数周期。这就是为什么我们拥有所有管而不是所有管然后所有三个。在输出编程中并发可以是一个非常有价值的工具。多处理具有执行所有多任务工作的操作,并且在 Python 中,它是多核并发的唯一选择,即让您的程序在 CPU 的多个内核上执行。如果使用线程,那么操作系统仍在执行所有多任务工作,并且在 cpython 中,全局 intrepeter 锁可防止异步编程中的多核并发。没有操作系统干预,只有一个进程,一个线程,所以运行良好的任务可以在有等待期时释放 CPU,以便其他任务可以使用它。
import asyncio
loop = asyncio.get_event_loop()
async def greeter(name):
print(f"Hi, {name} you're in a coroutine.")
try:
print('starting coroutine')
coro = greeter('LP')
print('entering event loop')
loop.run_until_complete(coro)
finally:
print('closing event loop')
loop.close()
输出:
starting coroutine
entering event loop
Hi, LP you're in a coroutine.
closing event loop
异步框架需要一个调度程序,通常称为事件循环。这个事件循环跟踪所有正在运行的任务,当一个函数挂起时,它会将控制权返回给事件循环,然后它会找到另一个函数来启动或恢复,这称为协作多任务。 Async IO 提供了一个框架,一个以事件循环为中心的异步框架,它有效地处理输入/输出事件,应用程序与事件循环显式交互它注册要运行的代码,然后它让事件循环调度程序进行必要的调用资源可用时的应用程序代码。因此,如果网络服务器打开套接字,然后注册它们以便在它们上发生输入事件时被告知,当有新的传入连接或有数据要读取时,事件循环将提醒服务器代码。如果从套接字读取的数据不超过服务器,则将控制权交还给事件循环。
从让出控制权回到事件循环的机制依赖于协同程序协同程序是一种为并发操作而设计的语言结构。协程可以使用 awake 关键字与另一个协程暂停执行,并且在暂停时协程状态保持不变,允许它从中断的地方继续,一个协程可以启动另一个协程,然后等待结果,这更容易将任务分解为可重用的部分。
import asyncio
loop = asyncio.get_event_loop()
async def outer():
print('in outer')
print('waiting for result 1')
result1 = await phase1()
print('waiting for result 2')
result2 = await phase2(result1)
return result1, result2
async def phase1():
print('in phase1')
return 'phase1 result'
async def phase2(arg):
print('in phase2')
return 'result2 derived from {}'.format(arg)
asyncio.run(outer())
输出:
in outer
waiting for result 1
in phase1
waiting for result 2
in phase2
此示例询问必须按顺序执行但可以与其他操作同时运行的两个阶段。使用 awake
关键字而不是向循环中添加新的协程,因为控制流已经在由循环管理的协程内部。没有必要告诉循环来管理新的协同程序。
简单..甜美..真棒..✅
import asyncio
import time
import random
async def eat():
wait = random.randint(0,3)
await asyncio.sleep(wait)
print("Done With Eating")
async def sleep():
wait = random.randint(0,3)
await asyncio.sleep(wait)
print("Done With Sleeping")
async def repeat():
wait = random.randint(0,3)
await asyncio.sleep(wait)
print("Done With Repeating")
async def main():
for x in range(5):
await asyncio.gather(eat(),sleep(),repeat())
time.sleep(2)
print("+","-"*20)
if __name__ == "__main__":
t = time.perf_counter()
asyncio.run(main())
t2 = time.perf_counter()
print(f'Total time elapsed: {t2:0.2f} seconds')
t
?没有意义,t2
对脚本的开始和结束进行了区分。
asncio.sleep
的并行性而很快,而在 time.sleep
的情况下,日期之间的间隔很慢?
我不知道为什么,但是关于这个主题的所有解释都太复杂了,或者他们使用了无用的 asyncio.sleep() 示例......到目前为止,我发现的最好的代码示例是:https://codeflex.co/python3-async-await-example/
每个人似乎都专注于将 time.sleep
切换到 asyncio.sleep
,但在现实世界中,这始终是不可能的。有时您需要进行库调用,这可能会进行 API 调用(例如:从 google 请求签名 URL)。
以下是您仍然可以使用 time.sleep
的方法,但以异步方式:
import asyncio
import time
from concurrent.futures.thread import ThreadPoolExecutor
def sleep():
print(f'Time: {time.time() - start:.2f}')
time.sleep(1)
async def sum(name, numbers):
_executor = ThreadPoolExecutor(2)
total = 0
for number in numbers:
print(f'Task {name}: Computing {total}+{number}')
await loop.run_in_executor(_executor, sleep)
total += number
print(f'Task {name}: Sum = {total}\n')
start = time.time()
loop = asyncio.get_event_loop()
tasks = [
loop.create_task(sum("A", [1, 2])),
loop.create_task(sum("B", [1, 2, 3])),
]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
end = time.time()
print(f'Time: {end-start:.2f} sec')
输出:
Task A: Computing 0+1
Time: 0.00
Task B: Computing 0+1
Time: 0.00
Task A: Computing 1+2
Time: 1.00
Task B: Computing 1+2
Time: 1.00
Task A: Sum = 3
Task B: Computing 3+3
Time: 2.01
Task B: Sum = 6
Time: 3.01 sec
甚至认为顶部的一些答案我认为有点抽象
from datetime import datetime
import asyncio
async def time_taking(max_val,task_no):
print("**TASK STARTING TO EXECUTE CONCURRENT TASk NO {} ***".format(task_no))
await asyncio.sleep(2)
value_list = []
for i in range(0,max_val):
value_list.append(i)
print("****FINSIHING UP TASk NO {} **".format(task_no))
return value_list
async def test2(task_no):
await asyncio.sleep(5)
print("**TASK STARTING TO EXECUTE CONCURRENT TASk NO {} ***".format(task_no))
await asyncio.sleep(5)
print("****FINSIHING UP TASk NO {} **".format(task_no))
async def function(value = None):
tasks = []
start_time = datetime.now()
# CONCURRENT TASKS
tasks.append(asyncio.create_task(time_taking(20,1)))
tasks.append(asyncio.create_task(time_taking(43,2)))
tasks.append(asyncio.create_task(test2(3)))
# concurrent execution
lists = await asyncio.gather(*tasks)
end_time = datetime.now()
time_taken = end_time - start_time
return lists,time_taken
# run inside event loop
res,time_taken = asyncio.run(function())
print(res,time_taken)
import asyncio
import requests
async def fetch_users():
response = requests.get('https://www.testjsonapi.com/users/')
users = response.json()
return users
async def print_users():
# create an asynchronous task to run concurrently
# which wont block executing print statement before it finishes
response = asyncio.create_task(fetch_users())
print("Fetching users ")
# wait to get users data from response before printing users
users = await response
for user in users:
print(f"name : {user['name']} email : {user['email']}")
asyncio.run(print_users())
print("All users printed in console")
输出看起来像这样
Fetching users
name : Harjas Malhotra email : harjas@gmail.com
name : Alisha Paul email : alisha@gmail.com
name : Mart Right email : marrk9658@yahoo.com
name : Brad Pitter email : brad@gmail.com
name : Ervin Dugg email : Ervin69@gmail.com
name : Graham Bell email : Graham@bell.biz
name : James Rush email : james369@hotmail.com
name : Deepak Dev email : deepak@gmail.com
name : Ajay Rich email : therichposts@gmail.com
All users printed in console
让我们看看代码是如何工作的。首先,当 python 调用 print_users()
时,它不会让它下面的 print 语句被执行,直到它完成。因此,在进入 print_users()
之后,将创建一个并发任务,以便它下面的语句可以与这里的 fetch_users()
任务同时运行。该任务何时运行 Fetching users
将在控制台中打印。之后,python 将等待来自 fetch_users()
的响应,因为在接收之前不应打印用户。 fetch_users()
完成后,所有用户名和电子邮件都将打印在控制台中。因此,在完成下面的 print_users()
打印语句后,它将被执行。
response = fetch_users()
- 你真的需要调用 asyncio.create_task()
吗?我问是因为该方法已经定义为异步。通常,您会在任务创建中包装一个非异步方法以使其异步。您是否不必要地嵌套任务?