ChatGPT解决这个技术问题 Extra ChatGPT

Python 中最简单的异步/等待示例

我已经阅读了很多关于 Python 3.5+ 中 asyncio / async / await 的示例、博客文章、问题/答案,其中很多都很复杂,我发现最简单的可能是 this one
它仍然使用ensure_future,为了学习 Python 中的异步编程,我希望看到一个更简单的示例,以及执行基本异步/等待示例所需的最少工具

问题:是否可以通过仅使用这两个关键字 + 运行异步循环的代码 + 其他 Python 代码但没有其他 {3 } 功能?

示例:像这样:

import asyncio

async def async_foo():
    print("async_foo started")
    await asyncio.sleep(5)
    print("async_foo done")

async def main():
    asyncio.ensure_future(async_foo())  # fire and forget async_foo()
    print('Do some actions 1')
    await asyncio.sleep(5)
    print('Do some actions 2')

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

但没有 ensure_future,仍然演示了 await / async 的工作原理。


w
wjandrea

为了回答您的问题,我将针对同一问题提供 3 种不同的解决方案。

案例1:只是普通的Python

import time

def sleep():
    print(f'Time: {time.time() - start:.2f}')
    time.sleep(1)

def sum(name, numbers):
    total = 0
    for number in numbers:
        print(f'Task {name}: Computing {total}+{number}')
        sleep()
        total += number
    print(f'Task {name}: Sum = {total}\n')

start = time.time()
tasks = [
    sum("A", [1, 2]),
    sum("B", [1, 2, 3]),
]
end = time.time()
print(f'Time: {end-start:.2f} sec')

输出:

Task A: Computing 0+1
Time: 0.00
Task A: Computing 1+2
Time: 1.00
Task A: Sum = 3

Task B: Computing 0+1
Time: 2.01
Task B: Computing 1+2
Time: 3.01
Task B: Computing 3+3
Time: 4.01
Task B: Sum = 6

Time: 5.02 sec

案例 2:async/await 做错了

import asyncio
import time

async def sleep():
    print(f'Time: {time.time() - start:.2f}')
    time.sleep(1)

async def sum(name, numbers):
    total = 0
    for number in numbers:
        print(f'Task {name}: Computing {total}+{number}')
        await sleep()
        total += number
    print(f'Task {name}: Sum = {total}\n')

start = time.time()

loop = asyncio.get_event_loop()
tasks = [
    loop.create_task(sum("A", [1, 2])),
    loop.create_task(sum("B", [1, 2, 3])),
]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

end = time.time()
print(f'Time: {end-start:.2f} sec')

输出:

Task A: Computing 0+1
Time: 0.00
Task A: Computing 1+2
Time: 1.00
Task A: Sum = 3

Task B: Computing 0+1
Time: 2.01
Task B: Computing 1+2
Time: 3.01
Task B: Computing 3+3
Time: 4.01
Task B: Sum = 6

Time: 5.01 sec

案例 3:async/await 正确完成

除了 sleep 函数外,与案例 2 相同:

async def sleep():
    print(f'Time: {time.time() - start:.2f}')
    await asyncio.sleep(1)

输出:

Task A: Computing 0+1
Time: 0.00
Task B: Computing 0+1
Time: 0.00
Task A: Computing 1+2
Time: 1.00
Task B: Computing 1+2
Time: 1.00
Task A: Sum = 3

Task B: Computing 3+3
Time: 2.00
Task B: Sum = 6

Time: 3.01 sec

案例 1 和案例 2 给出相同的 5 秒,而案例 3 仅 3 秒。所以正确的 async/await 会更快。

差异的原因在于 sleep 函数的实现。

# case 1
def sleep():
    ...
    time.sleep(1)

# case 2
async def sleep():
    ...
    time.sleep(1)

# case 3
async def sleep():
    ...
    await asyncio.sleep(1)

在案例 1 和案例 2 中,它们是“相同的”:它们“休眠”而不允许其他人使用资源。而在情况 3 中,它允许在睡眠时访问资源。

在案例 2 中,我们将 async 添加到普通函数中。然而,事件循环会不间断地运行。为什么?因为我们没有说允许循环在哪里中断你的函数来运行另一个任务。

在案例 3 中,我们准确地告诉事件循环在哪里中断函数以运行另一个任务。具体在哪里?就在这儿!

await asyncio.sleep(1)

有关此阅读的更多信息here

2020 年 5 月 2 日更新

考虑阅读

异步编程搭便车指南

Asyncio 期货和协程


您可能还想提到 sleep 语句通常表示 IO 操作
如果 async 函数和以下计算之间存在依赖关系怎么办?
非常好。如果这在官方文档中会很棒。
@quickinsights io 处于睡眠状态,因为您输入 ms 并将 sleep 作为输出?
M
Mikhail Gerasimov

是否可以通过仅使用这两个关键字 + asyncio.get_event_loop() + run_until_complete + 其他 Python 代码但不使用其他 asyncio 函数来给出一个简单的示例来展示 async / await 的工作原理?

这样就可以编写有效的代码:

import asyncio


async def main():
    print('done!')


if __name__ ==  '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

但是这样就不可能证明你为什么需要 asyncio。

顺便说一句,为什么需要asyncio,而不仅仅是纯代码?答案是 - asyncio 允许您在并行化 I/O 阻塞操作(如读取/写入网络)时获得性能优势。要编写有用的示例,您需要使用这些操作的异步实现。

请阅读 this answer 以获得更详细的说明。

更新:

好的,下面的示例使用 asyncio.sleep 来模拟 I/O 阻塞操作,而 asyncio.gather 显示了如何同时运行多个阻塞操作:

import asyncio


async def io_related(name):
    print(f'{name} started')
    await asyncio.sleep(1)
    print(f'{name} finished')


async def main():
    await asyncio.gather(
        io_related('first'),
        io_related('second'),
    )  # 1s + 1s = over 1s


if __name__ ==  '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

输出:

first started
second started
first finished
second finished
[Finished in 1.2s]

请注意两个 io_related 是如何启动的,仅仅一秒钟后,两者都完成了。


@Basj,我现在看到了:) 我用更有用的例子更新了答案。希望它会有所帮助。在现实生活中,一切都将相同,除了您将使用异步实现的 I/O 操作而不是 asyncio.sleep
谢谢你。那么是否必须使用 gatherensure_future 之类的东西或类似的东西?
@Basj 从技术上讲,您可以在没有 gather/ensure_future 的情况下编写异步示例,它会起作用(请参阅答案中的第一个代码片段)。但是如果没有 gather/ensure_future,您将无法同时(并行)运行协程,并且您根本无法从使用 asyncio 中获益。换句话说,这不是强制性的,但在没有 gather/ensure_future 的情况下使用 asyncio 是没有意义的。
@Basj 当然,如果这个协程在自我实现中并行化事物(使用 gather/ensure_future),那么您只需等待一些第三方协程即可受益。
B
Basj

Python 3.7+ 现在有 a simpler API (在我看来),措辞更简单(比“ensure_future”更容易记住):您可以使用 create_task 返回一个 Task 对象(如果需要,这可以用于稍后取消任务)。

基本示例 1

import asyncio

async def hello(i):
    print(f"hello {i} started")
    await asyncio.sleep(4)
    print(f"hello {i} done")

async def main():
    task1 = asyncio.create_task(hello(1))  # returns immediately, the task is created
    await asyncio.sleep(3)
    task2 = asyncio.create_task(hello(2))
    await task1
    await task2

asyncio.run(main())  # main loop

结果:

你好 1 开始 你好 2 开始 你好 1 完成 你好 2 完成

基本示例 2

如果您需要获取这些异步函数的返回值,那么 gather 很有用。以下示例的灵感来自 documentation

import asyncio

async def factorial(n):
    f = 1
    for i in range(2, n + 1):
        print(f"Computing factorial({n}), currently i={i}...")
        await asyncio.sleep(1)
        f *= i
    return f

async def main():
    L = await asyncio.gather(factorial(2), factorial(3), factorial(4))
    print(L)  # [2, 6, 24]

asyncio.run(main())

预期输出:

计算阶乘(2),当前 i=2... 计算阶乘(3),当前 i=2... 计算阶乘(4),当前 i=2... 计算阶乘(3),当前 i=3。 .. 计算阶乘(4),当前 i=3... 计算阶乘(4),当前 i=4... [2, 6, 24]

PS:即使你使用 asyncio,而不是 triothe tutorial of the latter 对我理解 Python 异步编程很有帮助。


对于“收集”,是否可以将值移到异步主目录之外?例如:我们返回 L 而不是 print(L)。对我来说它只返回 None。还是必须保留 async 函数?
很好 - 如果在示例 1 中两者的睡眠时间相似怎么办?为了使示例 1 更清晰,您应该添加一个不带 async 的示例。没有异步播放睡眠时间无关紧要,而使用异步则可以。所以有并发竞赛..
M
Milovan Tomašević

既然一切都很好地解释了,那么让我们运行一些带有事件循环的示例,将同步代码与异步代码进行比较。

同步代码:

import time

def count():
    time.sleep(1)
    print('1')
    time.sleep(1)
    print('2')
    time.sleep(1)
    print('3')

def main():
    for i in range(3):
        count()

if __name__ == "__main__":
    t = time.perf_counter()
    main()
    t2 = time.perf_counter()
    
    print(f'Total time elapsed: {t2:0.2f} seconds')

输出:

1
2
3
1
2
3
1
2
3
Total time elapsed: 9.00 seconds

我们可以看到,在下一个周期开始之前,每个计数周期都运行到完成。

异步代码:

import asyncio
import time

async def count():
    await asyncio.sleep(1)
    print('1')
    await asyncio.sleep(1)
    print('2')
    await asyncio.sleep(1)
    print('3')

async def main():
    await asyncio.gather(count(), count(), count())

if __name__ == "__main__":
    t = time.perf_counter()
    asyncio.run(main())
    t2 = time.perf_counter()

    print(f'Total time elapsed: {t2:0.2f} seconds')

输出:

1
1
1
2
2
2
3
3
3
Total time elapsed: 3.00 seconds

另一方面,异步等效项看起来像这样运行需要 3 秒而不是 9 秒。第一个计数周期开始,一旦它进入 await 睡眠,Python 就可以自由地做其他工作,例如开始第二个计数周期和随后的第三个计数周期。这就是为什么我们拥有所有管而不是所有管然后所有三个。在输出编程中并发可以是一个非常有价值的工具。多处理具有执行所有多任务工作的操作,并且在 Python 中,它是多核并发的唯一选择,即让您的程序在 CPU 的多个内核上执行。如果使用线程,那么操作系统仍在执行所有多任务工作,并且在 cpython 中,全局 intrepeter 锁可防止异步编程中的多核并发。没有操作系统干预,只有一个进程,一个线程,所以运行良好的任务可以在有等待期时释放 CPU,以便其他任务可以使用它。

import asyncio

loop = asyncio.get_event_loop()


async def greeter(name):
    print(f"Hi, {name} you're in a coroutine.")

try:
    print('starting coroutine')
    coro = greeter('LP')
    print('entering event loop')
    loop.run_until_complete(coro)
finally:
    print('closing event loop')
    loop.close()

输出:

starting coroutine
entering event loop
Hi, LP you're in a coroutine.
closing event loop

异步框架需要一个调度程序,通常称为事件循环。这个事件循环跟踪所有正在运行的任务,当一个函数挂起时,它会将控制权返回给事件循环,然后它会找到另一个函数来启动或恢复,这称为协作多任务。 Async IO 提供了一个框架,一个以事件循环为中心的异步框架,它有效地处理输入/输出事件,应用程序与事件循环显式交互它注册要运行的代码,然后它让事件循环调度程序进行必要的调用资源可用时的应用程序代码。因此,如果网络服务器打开套接字,然后注册它们以便在它们上发生输入事件时被告知,当有新的传入连接或有数据要读取时,事件循环将提醒服务器代码。如果从套接字读取的数据不超过服务器,则将控制权交还给事件循环。

从让出控制权回到事件循环的机制依赖于协同程序协同程序是一种为并发操作而设计的语言结构。协程可以使用 awake 关键字与另一个协程暂停执行,并且在暂停时协程状态保持不变,允许它从中断的地方继续,一个协程可以启动另一个协程,然后等待结果,这更容易将任务分解为可重用的部分。

import asyncio

loop = asyncio.get_event_loop()

async def outer():
    print('in outer')
    print('waiting for result 1')
    result1 = await phase1()
    print('waiting for result 2')
    result2 = await phase2(result1)
    return result1, result2


async def phase1():
    print('in phase1')
    return 'phase1 result'

async def phase2(arg):
    print('in phase2')
    return 'result2 derived from {}'.format(arg)

asyncio.run(outer())

输出:

in outer
waiting for result 1
in phase1
waiting for result 2
in phase2

此示例询问必须按顺序执行但可以与其他操作同时运行的两个阶段。使用 awake 关键字而不是向循环中添加新的协程,因为控制流已经在由循环管理的协程内部。没有必要告诉循环来管理新的协同程序。


R
Rahul kuchhadia

简单..甜美..真棒..✅

  import asyncio
  import time
  import random

  async def eat():
     wait = random.randint(0,3)
     await asyncio.sleep(wait)
     print("Done With Eating")

  async def sleep():
     wait = random.randint(0,3)
     await asyncio.sleep(wait)
     print("Done With Sleeping")

  async def repeat():
     wait = random.randint(0,3)
     await asyncio.sleep(wait)
     print("Done With Repeating")

  async def main():
     for x in range(5):
        await asyncio.gather(eat(),sleep(),repeat())
        time.sleep(2)
        print("+","-"*20)

  if __name__ == "__main__":
     t = time.perf_counter()
     asyncio.run(main())
     t2 = time.perf_counter()

     print(f'Total time elapsed: {t2:0.2f} seconds')

为什么需要 t?没有意义,t2 对脚本的开始和结束进行了区分。
因此,您想表明日期本身由于与 asncio.sleep 的并行性而很快,而在 time.sleep 的情况下,日期之间的间隔很慢?
@Timo 它用于总时间计算
J
JavaGoPro

我不知道为什么,但是关于这个主题的所有解释都太复杂了,或者他们使用了无用的 asyncio.sleep() 示例......到目前为止,我发现的最好的代码示例是:https://codeflex.co/python3-async-await-example/


不会说没用,但链接很好
链接也很复杂:(
S
SynackSA

每个人似乎都专注于将 time.sleep 切换到 asyncio.sleep,但在现实世界中,这始终是不可能的。有时您需要进行库调用,这可能会进行 API 调用(例如:从 google 请求签名 URL)。

以下是您仍然可以使用 time.sleep 的方法,但以异步方式:

import asyncio
import time
from concurrent.futures.thread import ThreadPoolExecutor

def sleep():
    print(f'Time: {time.time() - start:.2f}')
    time.sleep(1)

async def sum(name, numbers):
    _executor = ThreadPoolExecutor(2)
    total = 0
    for number in numbers:
        print(f'Task {name}: Computing {total}+{number}')
        await loop.run_in_executor(_executor, sleep)
        total += number
    print(f'Task {name}: Sum = {total}\n')

start = time.time()

loop = asyncio.get_event_loop()
tasks = [
    loop.create_task(sum("A", [1, 2])),
    loop.create_task(sum("B", [1, 2, 3])),
]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

end = time.time()
print(f'Time: {end-start:.2f} sec')

输出:

Task A: Computing 0+1
Time: 0.00
Task B: Computing 0+1
Time: 0.00
Task A: Computing 1+2
Time: 1.00
Task B: Computing 1+2
Time: 1.00
Task A: Sum = 3

Task B: Computing 3+3
Time: 2.01
Task B: Sum = 6

Time: 3.01 sec

A
Atif Shafi

甚至认为顶部的一些答案我认为有点抽象

from datetime import datetime
import asyncio




async def time_taking(max_val,task_no):
    print("**TASK STARTING TO EXECUTE CONCURRENT TASk NO {} ***".format(task_no))

    await asyncio.sleep(2)
    value_list = []
    for i in range(0,max_val):
        value_list.append(i)

    print("****FINSIHING UP TASk NO {}  **".format(task_no))
    return value_list



async def test2(task_no):
    await asyncio.sleep(5)
    print("**TASK STARTING TO EXECUTE CONCURRENT TASk NO {} ***".format(task_no))
    await asyncio.sleep(5)
    print("****FINSIHING UP  TASk NO {}  **".format(task_no))

async def function(value = None):
    tasks = []
    start_time = datetime.now()
    
    # CONCURRENT TASKS
    tasks.append(asyncio.create_task(time_taking(20,1)))
    tasks.append(asyncio.create_task(time_taking(43,2)))
    tasks.append(asyncio.create_task(test2(3)))
    
    # concurrent execution
    lists = await asyncio.gather(*tasks)
    end_time = datetime.now()
    
    time_taken = end_time - start_time
    return lists,time_taken


# run inside event loop 
res,time_taken = asyncio.run(function())

print(res,time_taken)

R
Ruman
import asyncio
import requests

async def fetch_users():
    response = requests.get('https://www.testjsonapi.com/users/')
    users = response.json()
    return users

async def print_users():
    # create an asynchronous task to run concurrently 
    # which wont block executing print statement before it finishes
    response = asyncio.create_task(fetch_users())
    print("Fetching users ")
    # wait to get users data from response before printing users
    users = await response

    for user in users:
        print(f"name : {user['name']} email : {user['email']}")

asyncio.run(print_users())
print("All users printed in console")

输出看起来像这样

Fetching users
name : Harjas Malhotra email : harjas@gmail.com
name : Alisha Paul email : alisha@gmail.com
name : Mart Right email : marrk9658@yahoo.com
name : Brad Pitter email : brad@gmail.com
name : Ervin Dugg email : Ervin69@gmail.com 
name : Graham Bell email : Graham@bell.biz
name : James Rush email : james369@hotmail.com
name : Deepak Dev email : deepak@gmail.com
name : Ajay Rich email : therichposts@gmail.com
All users printed in console

让我们看看代码是如何工作的。首先,当 python 调用 print_users() 时,它不会让它下面的 print 语句被执行,直到它完成。因此,在进入 print_users() 之后,将创建一个并发任务,以便它下面的语句可以与这里的 fetch_users() 任务同时运行。该任务何时运行 Fetching users 将在控制台中打印。之后,python 将等待来自 fetch_users() 的响应,因为在接收之前不应打印用户。 fetch_users() 完成后,所有用户名和电子邮件都将打印在控制台中。因此,在完成下面的 print_users() 打印语句后,它将被执行。


请在您的答案中添加一些解释,以便其他人可以从中学习
如果你运行这个简单的程序然后观察输出肯定你可以理解一切是如何工作的。由于这个程序很容易理解,所以我没有添加任何解释。稍后我将尝试在代码中添加解释。谢谢
@Ruman 一个问题。在 c# 中,此行是有效的 response = fetch_users() - 你真的需要调用 asyncio.create_task() 吗?我问是因为该方法已经定义为异步。通常,您会在任务创建中包装一个非异步方法以使其异步。您是否不必要地嵌套任务?
@迈克尔Z。你说的对。由于 fetch_users() 函数已经是一个异步函数,这就是为什么 asyncio.create_task() 是多余的。简单地说,我们可以使用 response = fetch_users()。