ChatGPT解决这个技术问题 Extra ChatGPT

Making an asynchronous task in Flask

I am writing an application in Flask, which works really well except that WSGI is synchronous and blocking. I have one task in particular which calls out to a third party API and that task can take several minutes to complete. I would like to make that call (it's actually a series of calls) and let it run. while control is returned to Flask.

My view looks like:

@app.route('/render/<id>', methods=['POST'])
def render_script(id=None):
    ...
    data = json.loads(request.data)
    text_list = data.get('text_list')
    final_file = audio_class.render_audio(data=text_list)
    # do stuff
    return Response(
        mimetype='application/json',
        status=200
    )

Now, what I want to do is have the line

final_file = audio_class.render_audio()

run and provide a callback to be executed when the method returns, whilst Flask can continue to process requests. This is the only task which I need Flask to run asynchronously, and I would like some advice on how best to implement this.

I have looked at Twisted and Klein, but I'm not sure they are overkill, as maybe Threading would suffice. Or maybe Celery is a good choice for this?

I usually use celery for this ... it might be overkill but afaik threading doesnt work well in web environments (iirc... )
Right. Yeah - I was just investigating Celery. It might be a good approach. Easy to implement with Flask?
heh i tend to use a socket server also (flask-socketio) and yes i thought it was pretty easy... the hardest part was getting everything installed
I would recommend checking this out. This guy writes great tutorials for flask in general, and this one is great for understanding how to integrate asynchronous tasks into a flask app.

A
Alen Paul Varghese

I would use Celery to handle the asynchronous task for you. You'll need to install a broker to serve as your task queue (RabbitMQ and Redis are recommended).

app.py:

from flask import Flask
from celery import Celery

broker_url = 'amqp://guest@localhost'          # Broker URL for RabbitMQ task queue

app = Flask(__name__)    
celery = Celery(app.name, broker=broker_url)
celery.config_from_object('celeryconfig')      # Your celery configurations in a celeryconfig.py

@celery.task(bind=True)
def some_long_task(self, x, y):
    # Do some long task
    ...

@app.route('/render/<id>', methods=['POST'])
def render_script(id=None):
    ...
    data = json.loads(request.data)
    text_list = data.get('text_list')
    final_file = audio_class.render_audio(data=text_list)
    some_long_task.delay(x, y)                 # Call your async task and pass whatever necessary variables
    return Response(
        mimetype='application/json',
        status=200
    )

Run your Flask app, and start another process to run your celery worker.

$ celery worker -A app.celery --loglevel=debug

I would also refer to Miguel Gringberg's write up for a more in depth guide to using Celery with Flask.


Celery is a solid solution, but it's not a lightweight solution and takes a while to set up.
The Celery startup command no longer works. Apparently -A was removed as an argument in version 5.0: The support for this usage was removed in Celery 5.0
u
user

Threading is another possible solution. Although the Celery based solution is better for applications at scale, if you are not expecting too much traffic on the endpoint in question, threading is a viable alternative.

This solution is based on Miguel Grinberg's PyCon 2016 Flask at Scale presentation, specifically slide 41 in his slide deck. His code is also available on github for those interested in the original source.

From a user perspective the code works as follows:

You make a call to the endpoint that performs the long running task. This endpoint returns 202 Accepted with a link to check on the task status. Calls to the status link returns 202 while the taks is still running, and returns 200 (and the result) when the task is complete.

To convert an api call to a background task, simply add the @async_api decorator.

Here is a fully contained example:

from flask import Flask, g, abort, current_app, request, url_for
from werkzeug.exceptions import HTTPException, InternalServerError
from flask_restful import Resource, Api
from datetime import datetime
from functools import wraps
import threading
import time
import uuid

tasks = {}

app = Flask(__name__)
api = Api(app)


@app.before_first_request
def before_first_request():
    """Start a background thread that cleans up old tasks."""
    def clean_old_tasks():
        """
        This function cleans up old tasks from our in-memory data structure.
        """
        global tasks
        while True:
            # Only keep tasks that are running or that finished less than 5
            # minutes ago.
            five_min_ago = datetime.timestamp(datetime.utcnow()) - 5 * 60
            tasks = {task_id: task for task_id, task in tasks.items()
                     if 'completion_timestamp' not in task or task['completion_timestamp'] > five_min_ago}
            time.sleep(60)

    if not current_app.config['TESTING']:
        thread = threading.Thread(target=clean_old_tasks)
        thread.start()


def async_api(wrapped_function):
    @wraps(wrapped_function)
    def new_function(*args, **kwargs):
        def task_call(flask_app, environ):
            # Create a request context similar to that of the original request
            # so that the task can have access to flask.g, flask.request, etc.
            with flask_app.request_context(environ):
                try:
                    tasks[task_id]['return_value'] = wrapped_function(*args, **kwargs)
                except HTTPException as e:
                    tasks[task_id]['return_value'] = current_app.handle_http_exception(e)
                except Exception as e:
                    # The function raised an exception, so we set a 500 error
                    tasks[task_id]['return_value'] = InternalServerError()
                    if current_app.debug:
                        # We want to find out if something happened so reraise
                        raise
                finally:
                    # We record the time of the response, to help in garbage
                    # collecting old tasks
                    tasks[task_id]['completion_timestamp'] = datetime.timestamp(datetime.utcnow())

                    # close the database session (if any)

        # Assign an id to the asynchronous task
        task_id = uuid.uuid4().hex

        # Record the task, and then launch it
        tasks[task_id] = {'task_thread': threading.Thread(
            target=task_call, args=(current_app._get_current_object(),
                               request.environ))}
        tasks[task_id]['task_thread'].start()

        # Return a 202 response, with a link that the client can use to
        # obtain task status
        print(url_for('gettaskstatus', task_id=task_id))
        return 'accepted', 202, {'Location': url_for('gettaskstatus', task_id=task_id)}
    return new_function


class GetTaskStatus(Resource):
    def get(self, task_id):
        """
        Return status about an asynchronous task. If this request returns a 202
        status code, it means that task hasn't finished yet. Else, the response
        from the task is returned.
        """
        task = tasks.get(task_id)
        if task is None:
            abort(404)
        if 'return_value' not in task:
            return '', 202, {'Location': url_for('gettaskstatus', task_id=task_id)}
        return task['return_value']


class CatchAll(Resource):
    @async_api
    def get(self, path=''):
        # perform some intensive processing
        print("starting processing task, path: '%s'" % path)
        time.sleep(10)
        print("completed processing task, path: '%s'" % path)
        return f'The answer is: {path}'


api.add_resource(CatchAll, '/<path:path>', '/')
api.add_resource(GetTaskStatus, '/status/<task_id>')


if __name__ == '__main__':
    app.run(debug=True)


When I use this code, I have the error werkzeug.routing.BuildError: Could not build url for endpoint 'gettaskstatus' with values ['task_id'] Am I missing something ?
@NicolasDufaur mine worked when i fixed the case to GetTaskStatus
D
David Faber

You can also try using multiprocessing.Process with daemon=True; the process.start() method does not block and you can return a response/status immediately to the caller while your expensive function executes in the background.

I experienced similar problem while working with falcon framework and using daemon process helped.

You'd need to do the following:

from multiprocessing import Process

@app.route('/render/<id>', methods=['POST'])
def render_script(id=None):
    ...
    heavy_process = Process(  # Create a daemonic process with heavy "my_func"
        target=my_func,
        daemon=True
    )
    heavy_process.start()
    return Response(
        mimetype='application/json',
        status=200
    )

# Define some heavy function
def my_func():
    time.sleep(10)
    print("Process finished")

You should get a response immediately and, after 10s you should see a printed message in the console.

NOTE: Keep in mind that daemonic processes are not allowed to spawn any child processes.


asynchronous is a certain type of concurrency which is neither threading nor multiprocessing. Threading is however, much more closer in purpose as async task,
I don't understand your point. The author is talking about an asynchronous task, which is the task that runs "in the background", such that the caller does not block until it gets a response. Spawning a deamon process is an example of where such asynchronism can be achieved.
what if /render/<id> endpoint expects something as a result of my_func() ?
You can make my_func send response/heartbeats to some other endpoint for example. Or you can establish and share some message queue through which you can communicate with my_func
A
Anand Tripathi

Flask 2.0

Flask 2.0 supports async routes now. You can use the httpx library and use the asyncio coroutines for that. You can change your code a bit like below

@app.route('/render/<id>', methods=['POST'])
async def render_script(id=None):
    ...
    data = json.loads(request.data)
    text_list = data.get('text_list')
    final_file =  await asyncio.gather(
        audio_class.render_audio(data=text_list),
        do_other_stuff_function()
    )
    # Just make sure that the coroutine should not  having any blocking calls inside it. 
    return Response(
        mimetype='application/json',
        status=200
    )

The above one is just a pseudo code, but you can checkout how asyncio works with flask 2.0 and for HTTP calls you can use httpx. And also make sure the coroutines are only doing some I/O tasks only.


There is also Quart that follow the same structure from Flask. Is recommend to use Quart whenever you want to use full potential from asynchronous code. Flask: flask.palletsprojects.com/en/2.0.x/async-await Quart: pgjones.gitlab.io/quart/index.html
that implementation has an error: raise TypeError(f'Object of type {o.__class__.__name__} ' TypeError: Object of type coroutine is not JSON serializable
While your statement on async support in Flask >=2.0 is true, the question asker's question cannot be resolved with this approach, as Flask does not support background tasks (which they also point out in their documentation).
B
Binh Ho

If you are using redis, you can use Pubsub event to handle background tasks. See more: https://redis.com/ebook/part-2-core-concepts/chapter-3-commands-in-redis/3-6-publishsubscribe/