ChatGPT解决这个技术问题 Extra ChatGPT

在 Flask 中创建一个异步任务

我正在用 Flask 编写一个应用程序,除了 WSGI 是同步和阻塞的之外,它工作得非常好。我有一项特别需要调用第三方 API 的任务,而该任务可能需要几分钟才能完成。我想打那个电话(实际上是一系列电话)并让它运行。而控制权返回给 Flask。

我的观点如下:

@app.route('/render/<id>', methods=['POST'])
def render_script(id=None):
    ...
    data = json.loads(request.data)
    text_list = data.get('text_list')
    final_file = audio_class.render_audio(data=text_list)
    # do stuff
    return Response(
        mimetype='application/json',
        status=200
    )

现在,我想做的是有这条线

final_file = audio_class.render_audio()

run 并在方法返回时提供要执行的回调,而 Flask 可以继续处理请求。这是我需要 Flask 异步运行的唯一任务,我想要一些关于如何最好地实现它的建议。

我看过 Twisted 和 Klein,但我不确定它们是否矫枉过正,因为也许 Threading 就足够了。或者也许芹菜是一个不错的选择?

我通常为此使用 celery ...这可能有点过头了,但 afaik 线程在 Web 环境中效果不佳(iirc ...)
正确的。是的 - 我只是在调查 Celery。这可能是一个很好的方法。用 Flask 容易实现吗?
嘿,我也倾向于使用套接字服务器(flask-socketio),是的,我认为这很容易......最难的部分是安装所有东西
我会建议检查 this。这家伙为烧瓶写了很棒的教程,这一篇非常适合理解如何将异步任务集成到烧瓶应用程序中。

A
Alen Paul Varghese

我将使用 Celery 为您处理异步任务。您需要安装一个代理作为您的任务队列(推荐使用 RabbitMQ 和 Redis)。

app.py

from flask import Flask
from celery import Celery

broker_url = 'amqp://guest@localhost'          # Broker URL for RabbitMQ task queue

app = Flask(__name__)    
celery = Celery(app.name, broker=broker_url)
celery.config_from_object('celeryconfig')      # Your celery configurations in a celeryconfig.py

@celery.task(bind=True)
def some_long_task(self, x, y):
    # Do some long task
    ...

@app.route('/render/<id>', methods=['POST'])
def render_script(id=None):
    ...
    data = json.loads(request.data)
    text_list = data.get('text_list')
    final_file = audio_class.render_audio(data=text_list)
    some_long_task.delay(x, y)                 # Call your async task and pass whatever necessary variables
    return Response(
        mimetype='application/json',
        status=200
    )

运行您的 Flask 应用程序,并启动另一个进程来运行您的 celery worker。

$ celery worker -A app.celery --loglevel=debug

我还想参考 Miguel Gringberg 的 write up,以获得更深入的指南来使用 Celery 和 Flask。


Celery 是一个可靠的解决方案,但它不是一个轻量级的解决方案,需要一段时间来设置。
Celery 启动命令不再起作用。显然 -A 在 5.0 版中已作为参数被删除:The support for this usage was removed in Celery 5.0
u
user

线程是另一种可能的解决方案。尽管基于 Celery 的解决方案更适合大规模应用程序,但如果您不期望相关端点上有太多流量,线程是一个可行的替代方案。

此解决方案基于 Miguel Grinberg's PyCon 2016 Flask at Scale presentation,特别是他的幻灯片中的 slide 41。他的 code is also available on github 适合那些对原始来源感兴趣的人。

从用户的角度来看,代码的工作方式如下:

您调用执行长时间运行任务的端点。此端点返回 202 Accepted 并带有一个链接以检查任务状态。在任务仍在运行时调用状态链接返回 202,并在任务完成时返回 200(和结果)。

要将 api 调用转换为后台任务,只需添加 @async_api 装饰器。

这是一个完整的示例:

from flask import Flask, g, abort, current_app, request, url_for
from werkzeug.exceptions import HTTPException, InternalServerError
from flask_restful import Resource, Api
from datetime import datetime
from functools import wraps
import threading
import time
import uuid

tasks = {}

app = Flask(__name__)
api = Api(app)


@app.before_first_request
def before_first_request():
    """Start a background thread that cleans up old tasks."""
    def clean_old_tasks():
        """
        This function cleans up old tasks from our in-memory data structure.
        """
        global tasks
        while True:
            # Only keep tasks that are running or that finished less than 5
            # minutes ago.
            five_min_ago = datetime.timestamp(datetime.utcnow()) - 5 * 60
            tasks = {task_id: task for task_id, task in tasks.items()
                     if 'completion_timestamp' not in task or task['completion_timestamp'] > five_min_ago}
            time.sleep(60)

    if not current_app.config['TESTING']:
        thread = threading.Thread(target=clean_old_tasks)
        thread.start()


def async_api(wrapped_function):
    @wraps(wrapped_function)
    def new_function(*args, **kwargs):
        def task_call(flask_app, environ):
            # Create a request context similar to that of the original request
            # so that the task can have access to flask.g, flask.request, etc.
            with flask_app.request_context(environ):
                try:
                    tasks[task_id]['return_value'] = wrapped_function(*args, **kwargs)
                except HTTPException as e:
                    tasks[task_id]['return_value'] = current_app.handle_http_exception(e)
                except Exception as e:
                    # The function raised an exception, so we set a 500 error
                    tasks[task_id]['return_value'] = InternalServerError()
                    if current_app.debug:
                        # We want to find out if something happened so reraise
                        raise
                finally:
                    # We record the time of the response, to help in garbage
                    # collecting old tasks
                    tasks[task_id]['completion_timestamp'] = datetime.timestamp(datetime.utcnow())

                    # close the database session (if any)

        # Assign an id to the asynchronous task
        task_id = uuid.uuid4().hex

        # Record the task, and then launch it
        tasks[task_id] = {'task_thread': threading.Thread(
            target=task_call, args=(current_app._get_current_object(),
                               request.environ))}
        tasks[task_id]['task_thread'].start()

        # Return a 202 response, with a link that the client can use to
        # obtain task status
        print(url_for('gettaskstatus', task_id=task_id))
        return 'accepted', 202, {'Location': url_for('gettaskstatus', task_id=task_id)}
    return new_function


class GetTaskStatus(Resource):
    def get(self, task_id):
        """
        Return status about an asynchronous task. If this request returns a 202
        status code, it means that task hasn't finished yet. Else, the response
        from the task is returned.
        """
        task = tasks.get(task_id)
        if task is None:
            abort(404)
        if 'return_value' not in task:
            return '', 202, {'Location': url_for('gettaskstatus', task_id=task_id)}
        return task['return_value']


class CatchAll(Resource):
    @async_api
    def get(self, path=''):
        # perform some intensive processing
        print("starting processing task, path: '%s'" % path)
        time.sleep(10)
        print("completed processing task, path: '%s'" % path)
        return f'The answer is: {path}'


api.add_resource(CatchAll, '/<path:path>', '/')
api.add_resource(GetTaskStatus, '/status/<task_id>')


if __name__ == '__main__':
    app.run(debug=True)


当我使用此代码时,出现错误 werkzeug.routing.BuildError: Could not build url for endpoint 'gettaskstatus' with values ['task_id'] 我错过了什么吗?
@NicolasDufaur 当我将案例修复为 GetTaskStatus 时,我的工作正常
D
David Faber

您也可以尝试将 multiprocessing.Processdaemon=True 一起使用; process.start() 方法不会阻塞,您可以在昂贵的函数在后台执行时立即向调用者返回响应/状态。

我在使用 falcon 框架和使用 daemon 过程时遇到了类似的问题。

您需要执行以下操作:

from multiprocessing import Process

@app.route('/render/<id>', methods=['POST'])
def render_script(id=None):
    ...
    heavy_process = Process(  # Create a daemonic process with heavy "my_func"
        target=my_func,
        daemon=True
    )
    heavy_process.start()
    return Response(
        mimetype='application/json',
        status=200
    )

# Define some heavy function
def my_func():
    time.sleep(10)
    print("Process finished")

您应该会立即得到响应,并且在 10 秒后您应该会在控制台中看到打印的消息。

注意:请记住,不允许 daemonic 进程产生任何子进程。


异步是一种既不是线程也不是多处理的并发类型。然而,线程在目的上更接近异步任务,
我不明白你的意思。作者说的是异步任务,它是“在后台”运行的任务,这样调用者在得到响应之前不会阻塞。产生一个守护进程是可以实现这种异步的一个例子。
如果 /render/<id> 端点期望 my_func() 的结果是什么?
例如,您可以让 my_func 向其他端点发送响应/心跳。或者您可以建立和共享一些消息队列,您可以通过它与 my_func 进行通信
A
Anand Tripathi

烧瓶 2.0

Flask 2.0 现在支持异步路由。您可以使用 httpx 库并为此使用 asyncio 协程。你可以改变你的代码有点像下面

@app.route('/render/<id>', methods=['POST'])
async def render_script(id=None):
    ...
    data = json.loads(request.data)
    text_list = data.get('text_list')
    final_file =  await asyncio.gather(
        audio_class.render_audio(data=text_list),
        do_other_stuff_function()
    )
    # Just make sure that the coroutine should not  having any blocking calls inside it. 
    return Response(
        mimetype='application/json',
        status=200
    )

以上只是一个伪代码,但您可以查看 asyncio 如何与 flask 2.0 一起使用,对于 HTTP 调用,您可以使用 httpx。还要确保协程只执行一些 I/O 任务。


还有 Quart 遵循 Flask 的相同结构。每当您想充分利用异步代码的潜力时,建议您使用 Quart。烧瓶:flask.palletsprojects.com/en/2.0.x/async-await 夸脱:pgjones.gitlab.io/quart/index.html
该实现有一个错误: raise TypeError(f'Object of type {o.__class__.__name__} ' TypeError: Object of type coroutine is not JSON serializable
虽然您关于 Flask >=2.0 中的 async 支持的声明是正确的,但问题提问者的问题无法通过这种方法解决,因为 Flask 不支持后台任务(他们在其文档中也 point out )。
B
Binh Ho

如果您使用 redis,您可以使用 Pubsub 事件来处理后台任务。查看更多:https://redis.com/ebook/part-2-core-concepts/chapter-3-commands-in-redis/3-6-publishsubscribe/