我尝试了 requests library for python 文档中提供的示例。
使用 async.map(rs)
,我获得了响应代码,但我想获得请求的每个页面的内容。例如,这不起作用:
out = async.map(rs)
print out[0].content
笔记
以下答案不适用于请求 v0.13.0+。编写此问题后,异步功能已移至 grequests。但是,您可以将 requests
替换为下面的 grequests
,它应该可以工作。
我留下这个答案是为了反映关于使用 requests < v0.13.0 的原始问题。
要使用async.map
异步 执行多项任务,您必须:
为您想要对每个对象(您的任务)执行的操作定义一个函数将该函数添加为您的请求中的事件挂钩 在所有请求/操作的列表上调用 async.map
例子:
from requests import async
# If using requests > v0.13.0, use
# from grequests import async
urls = [
'http://python-requests.org',
'http://httpbin.org',
'http://python-guide.org',
'http://kennethreitz.com'
]
# A simple task to do to each response object
def do_something(response):
print response.url
# A list to hold our things to do via async
async_list = []
for u in urls:
# The "hooks = {..." part is where you define what you want to do
#
# Note the lack of parentheses following do_something, this is
# because the response will be used as the first argument automatically
action_item = async.get(u, hooks = {'response' : do_something})
# Add the task to our list of things to do via async
async_list.append(action_item)
# Do our list of things to do via async
async.map(async_list)
async
现在是一个独立的模块:grequests
。
见这里:https://github.com/kennethreitz/grequests
还有:Ideal method for sending multiple HTTP requests over Python?
安装:
$ pip install grequests
用法:
建立一个堆栈:
import grequests
urls = [
'http://www.heroku.com',
'http://tablib.org',
'http://httpbin.org',
'http://python-requests.org',
'http://kennethreitz.com'
]
rs = (grequests.get(u) for u in urls)
发送堆栈
grequests.map(rs)
结果看起来像
[<Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>]
grequests 似乎没有为并发请求设置限制,即当多个请求发送到同一服务器时。
results = grequests.map(rs)
这行之后的代码被阻塞,我可以看到异步效果吗?
我测试了 requests-futures 和 grequests。 Grequests 更快,但会带来猴子补丁和其他依赖问题。 requests-futures 比 grequests 慢几倍。我决定编写自己的并将请求简单地包装到 ThreadPoolExecutor 中,它几乎与 grequests 一样快,但没有外部依赖项。
import requests
import concurrent.futures
def get_urls():
return ["url1","url2"]
def load_url(url, timeout):
return requests.get(url, timeout = timeout)
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
future_to_url = {executor.submit(load_url, url, 10): url for url in get_urls()}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
resp_err = resp_err + 1
else:
resp_ok = resp_ok + 1
不幸的是,据我所知,请求库不具备执行异步请求的能力。您可以将 async/await
语法包装在 requests
周围,但这将使底层请求的同步性不会降低。如果您想要真正的异步请求,则必须使用提供它的其他工具。一种这样的解决方案是 aiohttp
(Python 3.5.3+)。根据我使用 Python 3.7 async/await
语法的经验,它运行良好。下面我编写了三个执行 n 个 Web 请求的实现,使用
使用 Python requests 库的纯同步请求 (sync_requests_get_all) 使用 Python 3.7 async/await 语法和 asyncio 包装的 Python requests 库的同步请求 (async_requests_get_all) 使用 Python 3.7 async/await 包装的 Python aiohttp 库的真正异步实现 (async_aiohttp_get_all)语法和异步
"""
Tested in Python 3.5.10
"""
import time
import asyncio
import requests
import aiohttp
from asgiref import sync
def timed(func):
"""
records approximate durations of function calls
"""
def wrapper(*args, **kwargs):
start = time.time()
print('{name:<30} started'.format(name=func.__name__))
result = func(*args, **kwargs)
duration = "{name:<30} finished in {elapsed:.2f} seconds".format(
name=func.__name__, elapsed=time.time() - start
)
print(duration)
timed.durations.append(duration)
return result
return wrapper
timed.durations = []
@timed
def sync_requests_get_all(urls):
"""
performs synchronous get requests
"""
# use session to reduce network overhead
session = requests.Session()
return [session.get(url).json() for url in urls]
@timed
def async_requests_get_all(urls):
"""
asynchronous wrapper around synchronous requests
"""
session = requests.Session()
# wrap requests.get into an async function
def get(url):
return session.get(url).json()
async_get = sync.sync_to_async(get)
async def get_all(urls):
return await asyncio.gather(*[
async_get(url) for url in urls
])
# call get_all as a sync function to be used in a sync context
return sync.async_to_sync(get_all)(urls)
@timed
def async_aiohttp_get_all(urls):
"""
performs asynchronous get requests
"""
async def get_all(urls):
async with aiohttp.ClientSession() as session:
async def fetch(url):
async with session.get(url) as response:
return await response.json()
return await asyncio.gather(*[
fetch(url) for url in urls
])
# call get_all as a sync function to be used in a sync context
return sync.async_to_sync(get_all)(urls)
if __name__ == '__main__':
# this endpoint takes ~3 seconds to respond,
# so a purely synchronous implementation should take
# little more than 30 seconds and a purely asynchronous
# implementation should take little more than 3 seconds.
urls = ['https://postman-echo.com/delay/3']*10
async_aiohttp_get_all(urls)
async_requests_get_all(urls)
sync_requests_get_all(urls)
print('----------------------')
[print(duration) for duration in timed.durations]
在我的机器上,这是输出:
async_aiohttp_get_all started
async_aiohttp_get_all finished in 3.20 seconds
async_requests_get_all started
async_requests_get_all finished in 30.61 seconds
sync_requests_get_all started
sync_requests_get_all finished in 30.59 seconds
----------------------
async_aiohttp_get_all finished in 3.20 seconds
async_requests_get_all finished in 30.61 seconds
sync_requests_get_all finished in 30.59 seconds
async_aiohttp_get_all()
是一个不错的解决方案。我想出了类似的东西,但在它之外有一个额外的 async def fetch_all(urls): return await asyncio.gather(*[fetch(url) for url in urls])
,我的解决方案为每个 URL 创建单独的 aiohttp.ClientSession()
实例,而通过嵌入本地函数,您可以重用相同的会话......更多 Pythonic IMO。您能否提醒我在存在 get_all()
的情况下使用 sync.async_to_sync()
与在不存在 asyncio.run()
get_all()
的情况下使用的好处?
aiohttp
版本使用 asgiref.sync.async_to_sync
运行它?有没有办法在不包括额外模块的情况下做到这一点?
也许 requests-futures 是另一种选择。
from requests_futures.sessions import FuturesSession
session = FuturesSession()
# first request is started in background
future_one = session.get('http://httpbin.org/get')
# second requests is started immediately
future_two = session.get('http://httpbin.org/get?foo=bar')
# wait for the first request to complete, if it hasn't already
response_one = future_one.result()
print('response one status: {0}'.format(response_one.status_code))
print(response_one.content)
# wait for the second request to complete, if it hasn't already
response_two = future_two.result()
print('response two status: {0}'.format(response_two.status_code))
print(response_two.content)
the office document 中也推荐使用它。如果您不想涉及 gevent,这是一个很好的选择。
ThreadPoolExecutor(max_workers=10)
我发布的大多数答案都有很多问题 - 它们要么使用已被移植但功能有限的已弃用库,要么提供了一个在执行请求时具有太多魔力的解决方案,从而难以处理错误。如果它们不属于上述类别之一,则它们是第 3 方库或已弃用。
一些解决方案纯粹在 http 请求中可以正常工作,但是对于任何其他类型的请求,这些解决方案都达不到要求,这很可笑。这里不需要高度定制的解决方案。
只需使用 python 内置库 asyncio
就足以执行任何类型的异步请求,并为复杂和特定于用例的错误处理提供足够的流动性。
import asyncio
loop = asyncio.get_event_loop()
def do_thing(params):
async def get_rpc_info_and_do_chores(id):
# do things
response = perform_grpc_call(id)
do_chores(response)
async def get_httpapi_info_and_do_chores(id):
# do things
response = requests.get(URL)
do_chores(response)
async_tasks = []
for element in list(params.list_of_things):
async_tasks.append(loop.create_task(get_chan_info_and_do_chores(id)))
async_tasks.append(loop.create_task(get_httpapi_info_and_do_chores(ch_id)))
loop.run_until_complete(asyncio.gather(*async_tasks))
它的工作原理很简单。您正在创建一系列您希望异步执行的任务,然后要求循环执行这些任务并在完成后退出。没有因缺乏维护而导致的额外库,也没有缺乏所需的功能。
async
的 GRPC 或 HTTP 库。然后您可以例如执行 await response = requests.get(URL)
。不?
requests
周围创建一个包装器几乎比同步调用 URL 列表快(在某些情况下更慢)。例如,使用上述策略请求一个需要 3 秒响应 10 次的端点大约需要 30 秒。如果您想要真正的 async
性能,您需要使用类似 aiohttp
的东西。
requests.get
的同步调用,您仍会看到速度提升。但问题是如何使用 python requests
库执行异步请求。这个答案没有做到这一点,所以我的批评是成立的。
为此,您可以使用 httpx
。
import httpx
async def get_async(url):
async with httpx.AsyncClient() as client:
return await client.get(url)
urls = ["http://google.com", "http://wikipedia.org"]
# Note that you need an async context to use `await`.
await asyncio.gather(*map(get_async, urls))
如果您需要函数式语法,gamla 库会将其包装到 get_async
中。
然后你可以做
await gamla.map(gamla.get_async(10))(["http://google.com", "http://wikipedia.org"])
10
是以秒为单位的超时。
(免责声明:我是它的作者)
respx
用于模拟/测试 :)
await asyncio.gather(*map(get_async, urls)) ^ SyntaxError: invalid syntax
请指导
await
。
我知道这已经关闭了一段时间,但我认为推广另一个基于 requests 库的异步解决方案可能会很有用。
list_of_requests = ['http://moop.com', 'http://doop.com', ...]
from simple_requests import Requests
for response in Requests().swarm(list_of_requests):
print response.content
文档在这里:http://pythonhosted.org/simple-requests/
如果您想使用 asyncio,那么 requests-async
为 requests
- https://github.com/encode/requests-async 提供 async/await 功能
免责声明:Following code creates different threads for each function
。
这可能对某些情况有用,因为它更易于使用。但是要知道它不是异步的,而是使用多个线程产生异步的错觉,即使装饰器建议这样做。
您可以使用以下装饰器在函数执行完成后给出回调,回调必须处理函数返回的数据。
请注意,在函数被修饰后,它将返回一个 Future
对象。
import asyncio
## Decorator implementation of async runner !!
def run_async(callback, loop=None):
if loop is None:
loop = asyncio.get_event_loop()
def inner(func):
def wrapper(*args, **kwargs):
def __exec():
out = func(*args, **kwargs)
callback(out)
return out
return loop.run_in_executor(None, __exec)
return wrapper
return inner
实施示例:
urls = ["https://google.com", "https://facebook.com", "https://apple.com", "https://netflix.com"]
loaded_urls = [] # OPTIONAL, used for showing realtime, which urls are loaded !!
def _callback(resp):
print(resp.url)
print(resp)
loaded_urls.append((resp.url, resp)) # OPTIONAL, used for showing realtime, which urls are loaded !!
# Must provide a callback function, callback func will be executed after the func completes execution
# Callback function will accept the value returned by the function.
@run_async(_callback)
def get(url):
return requests.get(url)
for url in urls:
get(url)
如果您希望查看实时加载的 url,您也可以在末尾添加以下代码:
while True:
print(loaded_urls)
if len(loaded_urls) == len(urls):
break
from threading import Thread
threads=list()
for requestURI in requests:
t = Thread(target=self.openURL, args=(requestURI,))
t.start()
threads.append(t)
for thread in threads:
thread.join()
...
def openURL(self, requestURI):
o = urllib2.urlopen(requestURI, timeout = 600)
o...
我支持 suggestion above 使用 HTTPX,但我经常以不同的方式使用它,所以我添加了我的答案。
我个人使用 asyncio.run
(introduced in Python 3.7) 而不是 asyncio.gather
,也更喜欢 aiostream
方法,它可以与 asyncio 和 httpx 结合使用。
正如我刚刚发布的 this example 中一样,这种样式有助于异步处理一组 URL,即使(常见)错误发生也是如此。我特别喜欢这种风格如何阐明响应处理发生的位置以及便于错误处理(我发现异步调用往往会提供更多)。
发布一个异步触发一堆请求的简单示例更容易,但通常您还想处理响应内容(用它计算一些东西,也许参考您请求的 URL 的原始对象) .
该方法的核心如下所示:
async with httpx.AsyncClient(timeout=timeout) as session:
ws = stream.repeat(session)
xs = stream.zip(ws, stream.iterate(urls))
ys = stream.starmap(xs, fetch, ordered=False, task_limit=20)
process = partial(process_thing, things=things, pbar=pbar, verbose=verbose)
zs = stream.map(ys, process)
return await zs
在哪里:
process_thing 是一个异步响应内容处理函数
things 是输入列表(URL 字符串的 urls 生成器来自),例如对象/字典列表
pbar 是一个进度条(例如 tqdm.tqdm)[可选但有用]
所有这些都放在一个异步函数 async_fetch_urlset
中,然后通过调用名为例如 fetch_things
的同步“顶级”函数来运行该函数,该函数运行协程 [这是异步函数返回的内容] 并管理事件循环:
def fetch_things(urls, things, pbar=None, verbose=False):
return asyncio.run(async_fetch_urlset(urls, things, pbar, verbose))
由于可以就地修改作为输入传递的列表(此处为 things
),因此您可以有效地取回输出(就像我们习惯于从同步函数调用中一样)
一段时间以来,我一直在使用 python 请求对 github 的 gist API 进行异步调用。
例如,请参阅此处的代码:
https://github.com/davidthewatson/flasgist/blob/master/views.py#L60-72
这种风格的 python 可能不是最清楚的例子,但我可以向你保证,代码是有效的。如果这让您感到困惑,请告诉我,我会记录下来。
我还尝试了一些使用 python 中的异步方法的东西,但是我在使用 twisted 进行异步编程时有更好的运气。它的问题更少,并且有据可查。这是与您尝试扭曲的内容类似的链接。
http://pythonquirks.blogspot.com/2011/04/twisted-asynchronous-http-request.html
上面的答案都没有帮助我,因为他们假设您有一个预定义的请求列表,而在我的情况下,我需要能够监听请求并异步响应(类似于它在 nodejs 中的工作方式)。
def handle_finished_request(r, **kwargs):
print(r)
# while True:
def main():
while True:
address = listen_to_new_msg() # based on your server
# schedule async requests and run 'handle_finished_request' on response
req = grequests.get(address, timeout=1, hooks=dict(response=handle_finished_request))
job = grequests.send(req) # does not block! for more info see https://stackoverflow.com/a/16016635/10577976
main()
收到响应时将调用 handle_finished_request
回调。注意:由于某种原因超时(或无响应)不会在这里触发错误
这个简单的循环可以触发异步请求,类似于它在 nodejs 服务器中的工作方式
不定期副业成功案例分享
from grequests import async
不起作用.. 这个定义对我有用def do_something(response, **kwargs):
,我从 stackoverflow.com/questions/15594015/… 中找到它import grequests as async
替换from requests import async
对我有用。grequests
现在推荐requests-threads
或requests-futures