ChatGPT解决这个技术问题 Extra ChatGPT

带有 Python 请求的异步请求

我尝试了 requests library for python 文档中提供的示例。

使用 async.map(rs),我获得了响应代码,但我想获得请求的每个页面的内容。例如,这不起作用:

out = async.map(rs)
print out[0].content
也许你得到的回复是空的?
为我工作。请发布您收到的完整错误。
没有错误。它只是通过提供的测试 url 永远运行。
当我在 https 上使用 url 时,它显然会出现。 http工作得很好
大多数答案都已过时。在 2021 年,当前的潮流效应赢家是:docs.aiohttp.org/en/stable

J
Jeff

笔记

以下答案适用于请求 v0.13.0+。编写此问题后,异步功能已移至 grequests。但是,您可以将 requests 替换为下面的 grequests,它应该可以工作。

我留下这个答案是为了反映关于使用 requests < v0.13.0 的原始问题。

要使用async.map 异步 执行多项任务,您必须:

为您想要对每个对象(您的任务)执行的操作定义一个函数将该函数添加为您的请求中的事件挂钩 在所有请求/操作的列表上调用 async.map

例子:

from requests import async
# If using requests > v0.13.0, use
# from grequests import async

urls = [
    'http://python-requests.org',
    'http://httpbin.org',
    'http://python-guide.org',
    'http://kennethreitz.com'
]

# A simple task to do to each response object
def do_something(response):
    print response.url

# A list to hold our things to do via async
async_list = []

for u in urls:
    # The "hooks = {..." part is where you define what you want to do
    # 
    # Note the lack of parentheses following do_something, this is
    # because the response will be used as the first argument automatically
    action_item = async.get(u, hooks = {'response' : do_something})

    # Add the task to our list of things to do via async
    async_list.append(action_item)

# Do our list of things to do via async
async.map(async_list)

留下您的评论的好主意:由于最新请求和 grequests 之间的兼容性问题(请求 1.1.0 中缺少 max_retries 选项),我不得不降级请求以检索异步,我发现异步功能已随版本 0.13+ 移动(pypi.python.org/pypi/requests)
from grequests import async 不起作用.. 这个定义对我有用 def do_something(response, **kwargs):,我从 stackoverflow.com/questions/15594015/… 中找到它
如果 async.map 调用仍然阻塞,那么这是如何异步的?除了请求本身是异步发送的,检索仍然是同步的吗?
import grequests as async 替换 from requests import async 对我有用。
grequests 现在推荐 requests-threadsrequests-futures
C
Community

async 现在是一个独立的模块:grequests

见这里:https://github.com/kennethreitz/grequests

还有:Ideal method for sending multiple HTTP requests over Python?

安装:

$ pip install grequests

用法:

建立一个堆栈:

import grequests

urls = [
    'http://www.heroku.com',
    'http://tablib.org',
    'http://httpbin.org',
    'http://python-requests.org',
    'http://kennethreitz.com'
]

rs = (grequests.get(u) for u in urls)

发送堆栈

grequests.map(rs)

结果看起来像

[<Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>]

grequests 似乎没有为并发请求设置限制,即当多个请求发送到同一服务器时。


关于并发请求的限制 - 您可以在运行 map()/imap() 时指定池大小。即 grequests.map(rs, size=20) 有 20 个并发抓取。
到目前为止,这不支持 python3(gevent 无法在 py3.4 上构建 v2.6)。
我不太了解异步部分。如果我让results = grequests.map(rs)这行之后的代码被阻塞,我可以看到异步效果吗?
在 github,repo 上,grequests 的作者建议使用 requests-threads 或 requests-futures。
K
Kumpelinus

我测试了 requests-futuresgrequests。 Grequests 更快,但会带来猴子补丁和其他依赖问题。 requests-futures 比 grequests 慢几倍。我决定编写自己的并将请求简单地包装到 ThreadPoolExecutor 中,它几乎与 grequests 一样快,但没有外部依赖项。

import requests
import concurrent.futures

def get_urls():
    return ["url1","url2"]

def load_url(url, timeout):
    return requests.get(url, timeout = timeout)

with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:

    future_to_url = {executor.submit(load_url, url, 10): url for url in     get_urls()}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            resp_err = resp_err + 1
        else:
            resp_ok = resp_ok + 1

这里可能出现什么类型的异常?
requests.exceptions.Timeout
对不起,我不明白你的问题。在多个线程中仅使用单个 url?只有一种情况 DDoS 攻击 ))
我不明白为什么这个答案得到了这么多赞成。 OP 问题是关于异步请求的。 ThreadPoolExecutor 运行线程。是的,你可以在多个线程中发出请求,但这永远不会是一个异步程序,所以我怎么能回答原来的问题?
实际上,问题是关于如何并行加载 URL。是的,线程池执行器不是最好的选择,最好使用异步 io,但它在 Python 中运行良好。而且我不明白为什么线程不能用于异步?如果您需要异步运行 CPU 绑定任务怎么办?
D
DragonBobZ

不幸的是,据我所知,请求库不具备执行异步请求的能力。您可以将 async/await 语法包装在 requests 周围,但这将使底层请求的同步性不会降低。如果您想要真正的异步请求,则必须使用提供它的其他工具。一种这样的解决方案是 aiohttp (Python 3.5.3+)。根据我使用 Python 3.7 async/await 语法的经验,它运行良好。下面我编写了三个执行 n 个 Web 请求的实现,使用

使用 Python requests 库的纯同步请求 (sync_requests_get_all) 使用 Python 3.7 async/await 语法和 asyncio 包装的 Python requests 库的同步请求 (async_requests_get_all) 使用 Python 3.7 async/await 包装的 Python aiohttp 库的真正异步实现 (async_aiohttp_get_all)语法和异步

"""
Tested in Python 3.5.10
"""

import time
import asyncio
import requests
import aiohttp

from asgiref import sync

def timed(func):
    """
    records approximate durations of function calls
    """
    def wrapper(*args, **kwargs):
        start = time.time()
        print('{name:<30} started'.format(name=func.__name__))
        result = func(*args, **kwargs)
        duration = "{name:<30} finished in {elapsed:.2f} seconds".format(
            name=func.__name__, elapsed=time.time() - start
        )
        print(duration)
        timed.durations.append(duration)
        return result
    return wrapper

timed.durations = []


@timed
def sync_requests_get_all(urls):
    """
    performs synchronous get requests
    """
    # use session to reduce network overhead
    session = requests.Session()
    return [session.get(url).json() for url in urls]


@timed
def async_requests_get_all(urls):
    """
    asynchronous wrapper around synchronous requests
    """
    session = requests.Session()
    # wrap requests.get into an async function
    def get(url):
        return session.get(url).json()
    async_get = sync.sync_to_async(get)

    async def get_all(urls):
        return await asyncio.gather(*[
            async_get(url) for url in urls
        ])
    # call get_all as a sync function to be used in a sync context
    return sync.async_to_sync(get_all)(urls)

@timed
def async_aiohttp_get_all(urls):
    """
    performs asynchronous get requests
    """
    async def get_all(urls):
        async with aiohttp.ClientSession() as session:
            async def fetch(url):
                async with session.get(url) as response:
                    return await response.json()
            return await asyncio.gather(*[
                fetch(url) for url in urls
            ])
    # call get_all as a sync function to be used in a sync context
    return sync.async_to_sync(get_all)(urls)


if __name__ == '__main__':
    # this endpoint takes ~3 seconds to respond,
    # so a purely synchronous implementation should take
    # little more than 30 seconds and a purely asynchronous
    # implementation should take little more than 3 seconds.
    urls = ['https://postman-echo.com/delay/3']*10

    async_aiohttp_get_all(urls)
    async_requests_get_all(urls)
    sync_requests_get_all(urls)
    print('----------------------')
    [print(duration) for duration in timed.durations]

在我的机器上,这是输出:

async_aiohttp_get_all          started
async_aiohttp_get_all          finished in 3.20 seconds
async_requests_get_all         started
async_requests_get_all         finished in 30.61 seconds
sync_requests_get_all          started
sync_requests_get_all          finished in 30.59 seconds
----------------------
async_aiohttp_get_all          finished in 3.20 seconds
async_requests_get_all         finished in 30.61 seconds
sync_requests_get_all          finished in 30.59 seconds

“asnyc”是错字还是故意的?
绝对是错字
您的 async_aiohttp_get_all() 是一个不错的解决方案。我想出了类似的东西,但在它之外有一个额外的 async def fetch_all(urls): return await asyncio.gather(*[fetch(url) for url in urls]),我的解决方案为每个 URL 创建单独的 aiohttp.ClientSession() 实例,而通过嵌入本地函数,您可以重用相同的会话......更多 Pythonic IMO。您能否提醒我在存在 get_all() 的情况下使用 sync.async_to_sync() 与在不存在 asyncio.run() get_all() 的情况下使用的好处?
非常棒,绝对 async_aiohttp 比所有工作都好!
只是我还是这个纯 aiohttp 版本使用 asgiref.sync.async_to_sync 运行它?有没有办法在不包括额外模块的情况下做到这一点?
A
Androbin

也许 requests-futures 是另一种选择。

from requests_futures.sessions import FuturesSession

session = FuturesSession()
# first request is started in background
future_one = session.get('http://httpbin.org/get')
# second requests is started immediately
future_two = session.get('http://httpbin.org/get?foo=bar')
# wait for the first request to complete, if it hasn't already
response_one = future_one.result()
print('response one status: {0}'.format(response_one.status_code))
print(response_one.content)
# wait for the second request to complete, if it hasn't already
response_two = future_two.result()
print('response two status: {0}'.format(response_two.status_code))
print(response_two.content)

the office document 中也推荐使用它。如果您不想涉及 gevent,这是一个很好的选择。


最简单的解决方案之一。可以通过定义 max_workers 参数增加并发请求数
很高兴看到这个缩放的例子,所以我们不使用每个项目的一个变量名来循环。
每个请求有一个线程是对资源的极大浪费!例如,不可能同时执行 500 个请求,它会杀死你的 CPU。这绝不应该被认为是一个好的解决方案。
@CornelliuMaftuleac 好点。关于线程使用,您肯定需要关心它,并且库提供了启用线程池或处理池的选项。 ThreadPoolExecutor(max_workers=10)
@Dreampuf 处理池我相信更糟?
a
arshbot

我发布的大多数答案都有很多问题 - 它们要么使用已被移植但功能有限的已弃用库,要么提供了一个在执行请求时具有太多魔力的解决方案,从而难以处理错误。如果它们不属于上述类别之一,则它们是第 3 方库或已弃用。

一些解决方案纯粹在 http 请求中可以正常工作,但是对于任何其他类型的请求,这些解决方案都达不到要求,这很可笑。这里不需要高度定制的解决方案。

只需使用 python 内置库 asyncio 就足以执行任何类型的异步请求,并为复杂和特定于用例的错误处理提供足够的流动性。

import asyncio

loop = asyncio.get_event_loop()

def do_thing(params):
    async def get_rpc_info_and_do_chores(id):
        # do things
        response = perform_grpc_call(id)
        do_chores(response)

    async def get_httpapi_info_and_do_chores(id):
        # do things
        response = requests.get(URL)
        do_chores(response)

    async_tasks = []
    for element in list(params.list_of_things):
       async_tasks.append(loop.create_task(get_chan_info_and_do_chores(id)))
       async_tasks.append(loop.create_task(get_httpapi_info_and_do_chores(ch_id)))

    loop.run_until_complete(asyncio.gather(*async_tasks))

它的工作原理很简单。您正在创建一系列您希望异步执行的任务,然后要求循环执行这些任务并在完成后退出。没有因缺乏维护而导致的额外库,也没有缺乏所需的功能。


如果我理解正确,这会在执行 GRPC 和 HTTP 调用时阻止事件循环吗?那么如果这些调用需要几秒钟才能完成,你的整个事件循环会阻塞几秒钟吗?为避免这种情况,您需要使用属于 async 的 GRPC 或 HTTP 库。然后您可以例如执行 await response = requests.get(URL)。不?
不幸的是,在尝试这个时,我发现在 requests 周围创建一个包装器几乎比同步调用 URL 列表快(在某些情况下更慢)。例如,使用上述策略请求一个需要 3 秒响应 10 次的端点大约需要 30 秒。如果您想要真正的 async 性能,您需要使用类似 aiohttp 的东西。
@arshbot 是的,如果您的杂务是异步的,那么尽管等待对 requests.get 的同步调用,您仍会看到速度提升。但问题是如何使用 python requests 库执行异步请求。这个答案没有做到这一点,所以我的批评是成立的。
我认为这应该被颠倒。使用异步事件循环似乎足以触发异步请求。无需安装外部依赖项。
@iedmrc 可悲的是,事实并非如此。对于非阻塞任务,它必须使用 Python 中较新的异步工具来实现,而 requests 库并非如此。如果您只是在异步事件循环中使用棒请求任务,那么这些任务仍然会阻塞。话虽如此,您可以(如其他响应中所建议的那样)使用 gevent 或带有请求的线程之类的东西,但肯定不能使用 asyncio。
U
Uri

为此,您可以使用 httpx

import httpx

async def get_async(url):
    async with httpx.AsyncClient() as client:
        return await client.get(url)

urls = ["http://google.com", "http://wikipedia.org"]

# Note that you need an async context to use `await`.
await asyncio.gather(*map(get_async, urls))

如果您需要函数式语法,gamla 库会将其包装到 get_async 中。

然后你可以做


await gamla.map(gamla.get_async(10))(["http://google.com", "http://wikipedia.org"])

10 是以秒为单位的超时。

(免责声明:我是它的作者)


respx 用于模拟/测试 :)
嗨@Uri,我在尝试你在这个答案中提到的代码时遇到了错误。 await asyncio.gather(*map(get_async, urls)) ^ SyntaxError: invalid syntax请指导
请注意,您需要异步上下文才能使用 await
M
Monkey Boson

我知道这已经关闭了一段时间,但我认为推广另一个基于 requests 库的异步解决方案可能会很有用。

list_of_requests = ['http://moop.com', 'http://doop.com', ...]

from simple_requests import Requests
for response in Requests().swarm(list_of_requests):
    print response.content

文档在这里:http://pythonhosted.org/simple-requests/


@YSY 随意发布问题:github.com/ctheiss/simple-requests/issues;我实际上每天使用这个库数千次。
波士顿,您如何处理 404/500 错误? https 网址呢?将欣赏支持数千个网址的剪辑。你能贴一个例子吗?谢谢
@YSY 默认情况下 404/500 错误会引发异常。可以覆盖此行为(请参阅 pythonhosted.org/simple-requests/…)。由于对 gevent 的依赖,HTTPS url 很棘手,目前在这方面存在一个突出的错误 (github.com/gevent/gevent/issues/477)。您可以运行票证中的 shim,但它仍会向 SNI 服务器发出警告(但它工作)。至于剪辑,恐怕我所有的用法都在我的公司并且关闭了。但我向您保证,我们在数十个作业中执行数千个请求。
图书馆在交互方面看起来很时尚。 Python3+ 可以用吗?抱歉,没有看到任何提及。
@Jethro 绝对正确,该库需要完全重写,因为 Python 3 中的底层技术完全不同。目前,该库是“完整的”,但仅适用于 Python 2。
T
Tom Christie

如果您想使用 asyncio,那么 requests-asyncrequests - https://github.com/encode/requests-async 提供 async/await 功能


已确认,效果很好。在项目页面上,它说这项工作已被以下项目 github.com/encode/httpx 取代
v
vaskrneup

免责声明:Following code creates different threads for each function

这可能对某些情况有用,因为它更易于使用。但是要知道它不是异步的,而是使用多个线程产生异步的错觉,即使装饰器建议这样做。

您可以使用以下装饰器在函数执行完成后给出回调,回调必须处理函数返回的数据。

请注意,在函数被修饰后,它将返回一个 Future 对象。

import asyncio

## Decorator implementation of async runner !!
def run_async(callback, loop=None):
    if loop is None:
        loop = asyncio.get_event_loop()

    def inner(func):
        def wrapper(*args, **kwargs):
            def __exec():
                out = func(*args, **kwargs)
                callback(out)
                return out

            return loop.run_in_executor(None, __exec)

        return wrapper

    return inner

实施示例:

urls = ["https://google.com", "https://facebook.com", "https://apple.com", "https://netflix.com"]
loaded_urls = []  # OPTIONAL, used for showing realtime, which urls are loaded !!


def _callback(resp):
    print(resp.url)
    print(resp)
    loaded_urls.append((resp.url, resp))  # OPTIONAL, used for showing realtime, which urls are loaded !!


# Must provide a callback function, callback func will be executed after the func completes execution
# Callback function will accept the value returned by the function.
@run_async(_callback)
def get(url):
    return requests.get(url)


for url in urls:
    get(url)

如果您希望查看实时加载的 url,您也可以在末尾添加以下代码:

while True:
    print(loaded_urls)
    if len(loaded_urls) == len(urls):
        break

这可行,但它会为每个请求生成一个新线程,这似乎违背了使用 asyncio 的目的。
@rtaft 谢谢你的建议,我已经纠正了我的话。
D
Demitri
from threading import Thread

threads=list()

for requestURI in requests:
    t = Thread(target=self.openURL, args=(requestURI,))
    t.start()
    threads.append(t)

for thread in threads:
    thread.join()

...

def openURL(self, requestURI):
    o = urllib2.urlopen(requestURI, timeout = 600)
    o...

这是线程中的“正常”请求。不错的例子,购买是题外话。
L
Louis Maddox

我支持 suggestion above 使用 HTTPX,但我经常以不同的方式使用它,所以我添加了我的答案。

我个人使用 asyncio.run (introduced in Python 3.7) 而不是 asyncio.gather,也更喜欢 aiostream 方法,它可以与 asyncio 和 httpx 结合使用。

正如我刚刚发布的 this example 中一样,这种样式有助于异步处理一组 URL,即使(常见)错误发生也是如此。我特别喜欢这种风格如何阐明响应处理发生的位置以及便于错误处理(我发现异步调用往往会提供更多)。

发布一个异步触发一堆请求的简单示例更容易,但通常您还想处理响应内容(用它计算一些东西,也许参考您请求的 URL 的原始对象) .

该方法的核心如下所示:

async with httpx.AsyncClient(timeout=timeout) as session:
    ws = stream.repeat(session)
    xs = stream.zip(ws, stream.iterate(urls))
    ys = stream.starmap(xs, fetch, ordered=False, task_limit=20)
    process = partial(process_thing, things=things, pbar=pbar, verbose=verbose)
    zs = stream.map(ys, process)
    return await zs

在哪里:

process_thing 是一个异步响应内容处理函数

things 是输入列表(URL 字符串的 urls 生成器来自),例如对象/字典列表

pbar 是一个进度条(例如 tqdm.tqdm)[可选但有用]

所有这些都放在一个异步函数 async_fetch_urlset 中,然后通过调用名为例如 fetch_things 的同步“顶级”函数来运行该函数,该函数运行协程 [这是异步函数返回的内容] 并管理事件循环:

def fetch_things(urls, things, pbar=None, verbose=False):
    return asyncio.run(async_fetch_urlset(urls, things, pbar, verbose))

由于可以就地修改作为输入传递的列表(此处为 things),因此您可以有效地取回输出(就像我们习惯于从同步函数调用中一样)


D
David Watson

一段时间以来,我一直在使用 python 请求对 github 的 gist API 进行异步调用。

例如,请参阅此处的代码:

https://github.com/davidthewatson/flasgist/blob/master/views.py#L60-72

这种风格的 python 可能不是最清楚的例子,但我可以向你保证,代码是有效的。如果这让您感到困惑,请告诉我,我会记录下来。


S
Sam

我还尝试了一些使用 python 中的异步方法的东西,但是我在使用 twisted 进行异步编程时有更好的运气。它的问题更少,并且有据可查。这是与您尝试扭曲的内容类似的链接。

http://pythonquirks.blogspot.com/2011/04/twisted-asynchronous-http-request.html


Twisted 是老式的。请改用 HTTPX。
E
Eliav Louski

上面的答案都没有帮助我,因为他们假设您有一个预定义的请求列表,而在我的情况下,我需要能够监听请求并异步响应(类似于它在 nodejs 中的工作方式)。

def handle_finished_request(r, **kwargs):
    print(r)


# while True:
def main():
    while True:
        address = listen_to_new_msg()  # based on your server

        # schedule async requests and run 'handle_finished_request' on response
        req = grequests.get(address, timeout=1, hooks=dict(response=handle_finished_request))
        job = grequests.send(req)  # does not block! for more info see https://stackoverflow.com/a/16016635/10577976


main()

收到响应时将调用 handle_finished_request 回调。注意:由于某种原因超时(或无响应)不会在这里触发错误

这个简单的循环可以触发异步请求,类似于它在 nodejs 服务器中的工作方式