ChatGPT解决这个技术问题 Extra ChatGPT

How to limit the amount of concurrent async I/O operations?

// let's say there is a list of 1000+ URLs
string[] urls = { "http://google.com", "http://yahoo.com", ... };

// now let's send HTTP requests to each of these URLs in parallel
urls.AsParallel().ForAll(async (url) => {
    var client = new HttpClient();
    var html = await client.GetStringAsync(url);
});

Here is the problem, it starts 1000+ simultaneous web requests. Is there an easy way to limit the concurrent amount of these async http requests? So that no more than 20 web pages are downloaded at any given time. How to do it in the most efficient manner?

How is this different from your previous question?
stackoverflow.com/questions/9290498/… With a ParallelOptions parameter.
@ChrisDisley, this will only parallelize the launching of the requests.
Besides HttpClient is IDisposable, and you should dispose it, especially when you're going to use 1000+ of them. HttpClient can be used as a singleton for multiple requests.
@Shimmy you should never dispose HttpClient: stackoverflow.com/a/15708633/1246870

S
Simon_Weaver

You can definitely do this in the latest versions of async for .NET, using .NET 4.5 Beta. The previous post from 'usr' points to a good article written by Stephen Toub, but the less announced news is that the async semaphore actually made it into the Beta release of .NET 4.5

If you look at our beloved SemaphoreSlim class (which you should be using since it's more performant than the original Semaphore), it now boasts the WaitAsync(...) series of overloads, with all of the expected arguments - timeout intervals, cancellation tokens, all of your usual scheduling friends :)

Stephen's also written a more recent blog post about the new .NET 4.5 goodies that came out with beta see What’s New for Parallelism in .NET 4.5 Beta.

Last, here's some sample code about how to use SemaphoreSlim for async method throttling:

public async Task MyOuterMethod()
{
    // let's say there is a list of 1000+ URLs
    var urls = { "http://google.com", "http://yahoo.com", ... };

    // now let's send HTTP requests to each of these URLs in parallel
    var allTasks = new List<Task>();
    var throttler = new SemaphoreSlim(initialCount: 20);
    foreach (var url in urls)
    {
        // do an async wait until we can schedule again
        await throttler.WaitAsync();

        // using Task.Run(...) to run the lambda in its own parallel
        // flow on the threadpool
        allTasks.Add(
            Task.Run(async () =>
            {
                try
                {
                    var client = new HttpClient();
                    var html = await client.GetStringAsync(url);
                }
                finally
                {
                    throttler.Release();
                }
            }));
    }

    // won't get here until all urls have been put into tasks
    await Task.WhenAll(allTasks);

    // won't get here until all tasks have completed in some way
    // (either success or exception)
}

Last, but probably a worthy mention is a solution that uses TPL-based scheduling. You can create delegate-bound tasks on the TPL that have not yet been started, and allow for a custom task scheduler to limit the concurrency. In fact, there's an MSDN sample for it here:

See also TaskScheduler .


isn't a parallel.foreach with a limited degree of parallelism a nicer approach? msdn.microsoft.com/en-us/library/…
Why don't you dispose you HttpClient
@GreyCloud: Parallel.ForEach works with synchronous code. This allows you to call asynchronous code.
Given how popular this answer is, it's worth pointing out that HttpClient can and should be a single common instance rather than an instance per request.
Task.Run() is necessary here because if you await normally then the requests will be processed one at a time (since it's waiting for the request to finish before continuing the rest of the loop) instead of in parallel. However, if you don't await the request then you will release the semaphore as soon as the task is scheduled (allowing all requests to run at the same time), which defeats the purpose of using it in the first place. The context created by Task.Run is just a place to hold onto the semaphore resource.
L
Liam

If you have an IEnumerable (ie. strings of URL s) and you want to do an I/O bound operation with each of these (ie. make an async http request) concurrently AND optionally you also want to set the maximum number of concurrent I/O requests in real time, here is how you can do that. This way you do not use thread pool et al, the method uses semaphoreslim to control max concurrent I/O requests similar to a sliding window pattern one request completes, leaves the semaphore and the next one gets in.

usage:

await ForEachAsync(urlStrings, YourAsyncFunc, optionalMaxDegreeOfConcurrency);
public static Task ForEachAsync<TIn>(
        IEnumerable<TIn> inputEnumerable,
        Func<TIn, Task> asyncProcessor,
        int? maxDegreeOfParallelism = null)
    {
        int maxAsyncThreadCount = maxDegreeOfParallelism ?? DefaultMaxDegreeOfParallelism;
        SemaphoreSlim throttler = new SemaphoreSlim(maxAsyncThreadCount, maxAsyncThreadCount);

        IEnumerable<Task> tasks = inputEnumerable.Select(async input =>
        {
            await throttler.WaitAsync().ConfigureAwait(false);
            try
            {
                await asyncProcessor(input).ConfigureAwait(false);
            }
            finally
            {
                throttler.Release();
            }
        });

        return Task.WhenAll(tasks);
    }

no you should not need to explicitly dispose SemaphoreSlim in this implementation and usage as it is used internally inside the method and the method does not access its AvailableWaitHandle property in which case we would have needed to either dispose or wrap it within a using block.
Just thinking of the best practices and lessons we teach other people. A using would be nice.
well this example i can follow, but trying work out what is the best way to do this, basically have a throttler but my Func would return a list, which i ultimately want in a final list of all completed when done...which may require locked on list, do you have suggestions.
you can slightly update the method so it returns the list of actual tasks and you await Task.WhenAll from inside your calling code. Once Task.WhenAll is complete, you can enumerate over each task in the list and add its list to the final list. Change method signature to 'public static IEnumerable> ForEachAsync( IEnumerable inputEnumerable, Func> asyncProcessor, int? maxDegreeOfParallelism = null)'
0
0xced

There are a lot of pitfalls and direct use of a semaphore can be tricky in error cases, so I would suggest to use AsyncEnumerator NuGet Package instead of re-inventing the wheel:

// let's say there is a list of 1000+ URLs
string[] urls = { "http://google.com", "http://yahoo.com", ... };

// now let's send HTTP requests to each of these URLs in parallel
await urls.ParallelForEachAsync(async (url) => {
    var client = new HttpClient();
    var html = await client.GetStringAsync(url);
}, maxDegreeOfParalellism: 20);

As noted in prior posts you should not be creating new HttpClients in any kind of loop unless you actually enjoy socket exhaustion issues in production.
P
Pang

Unfortunately, the .NET Framework is missing most important combinators for orchestrating parallel async tasks. There is no such thing built-in.

Look at the AsyncSemaphore class built by the most respectable Stephen Toub. What you want is called a semaphore, and you need an async version of it.


Note that "Unfortunately, the .NET Framework is missing most important combinators for orchestrating parallel async tasks. There is no such thing built-in." is no longer correct as of .NET 4.5 Beta. SemaphoreSlim now offers WaitAsync(...) functionality :)
Should SemaphoreSlim (with its new async methods) be preferred over AsyncSemphore, or does Toub's implementation still have some advantage?
In my opinion, the built-in type should be preferred because it is likely to be well-tested and well-designed.
Stephen added a comment in response to a question on his blog post confirming that using SemaphoreSlim for .NET 4.5 would generally be the way to go.
T
Theodor Zoulias

After the release of the .NET 6 (in November, 2021), the recommended way of limiting the amount of concurrent asynchronous I/O operations is the Parallel.ForEachAsync API, with the MaxDegreeOfParallelism configuration. Here is how it can be used in practice:

// let's say there is a list of 1000+ URLs
string[] urls = { "http://google.com", "http://yahoo.com", /*...*/ };
var client = new HttpClient();
var options = new ParallelOptions() { MaxDegreeOfParallelism = 20 };

// now let's send HTTP requests to each of these URLs in parallel
await Parallel.ForEachAsync(urls, options, async (url, cancellationToken) =>
{
    var html = await client.GetStringAsync(url, cancellationToken);
});

In the above example the Parallel.ForEachAsync task is awaited asynchronously. You can also Wait it synchronously if you need to, which will block the current thread until the completion of all asynchronous operations. The synchronous Wait has the advantage that in case of errors, all exceptions will be propagated. On the contrary the await operator propagates by design only the first exception. In case this is a problem, you can find solutions here.

(Note: an idiomatic implementation of a ForEachAsync extension method that also propagates the results, can be found in the 4th revision of this answer)


A Parallel.ForEachAsync-based implementation that returns a Task<TResult[]> can be found here.
This is the correct answer now but it's at the bottom here. Yes this question is very old but it's the first Google search hit. Wish we could improve this.
J
Jay Shah

SemaphoreSlim can be very helpful here. Here's the extension method I've created.

    /// <summary>
    /// Concurrently Executes async actions for each item of <see cref="IEnumerable<typeparamref name="T"/>
    /// </summary>
    /// <typeparam name="T">Type of IEnumerable</typeparam>
    /// <param name="enumerable">instance of <see cref="IEnumerable<typeparamref name="T"/>"/></param>
    /// <param name="action">an async <see cref="Action" /> to execute</param>
    /// <param name="maxActionsToRunInParallel">Optional, max numbers of the actions to run in parallel,
    /// Must be grater than 0</param>
    /// <returns>A Task representing an async operation</returns>
    /// <exception cref="ArgumentOutOfRangeException">If the maxActionsToRunInParallel is less than 1</exception>
    public static async Task ForEachAsyncConcurrent<T>(
        this IEnumerable<T> enumerable,
        Func<T, Task> action,
        int? maxActionsToRunInParallel = null)
    {
        if (maxActionsToRunInParallel.HasValue)
        {
            using (var semaphoreSlim = new SemaphoreSlim(
                maxActionsToRunInParallel.Value, maxActionsToRunInParallel.Value))
            {
                var tasksWithThrottler = new List<Task>();

                foreach (var item in enumerable)
                {
                    // Increment the number of currently running tasks and wait if they are more than limit.
                    await semaphoreSlim.WaitAsync();

                    tasksWithThrottler.Add(Task.Run(async () =>
                    {
                        await action(item).ContinueWith(res =>
                        {
                            // action is completed, so decrement the number of currently running tasks
                            semaphoreSlim.Release();
                        });
                    }));
                }

                // Wait for all of the provided tasks to complete.
                await Task.WhenAll(tasksWithThrottler.ToArray());
            }
        }
        else
        {
            await Task.WhenAll(enumerable.Select(item => action(item)));
        }
    }

Sample Usage:

await enumerable.ForEachAsyncConcurrent(
    async item =>
    {
        await SomeAsyncMethod(item);
    },
    5);

Is there still nothing built into the framework that does this?
Did you ever make a SelectAsyncConcurrent version of this?
@Simon_Weaver I don't think framework has any built-in mechanism for this as of now.
@Simon_Weaver No, I have not built SelectAsyncConcurrent version, but that would be an interesting implementation.
I just made a very clumsy one that simply calls ForEachAsyncConcurrent. I only needed it in one place so it was fine. I just created a ConcurrentStack and added items to it inside a call to your function. The ordering wasn't important for me, but if anyone else attempts it don't use a List because a) it's not thread safe and b) the results may not come back in the same order anyway.
s
scottm

Although 1000 tasks might be queued very quickly, the Parallel Tasks library can only handle concurrent tasks equal to the amount of CPU cores in the machine. That means that if you have a four-core machine, only 4 tasks will be executing at a given time (unless you lower the MaxDegreeOfParallelism).


Yep, but that doesn't relate to async I/O operations. The code above will fire up 1000+ simultaneous downloads even if it is running on a single thread.
Didn't see the await keyword in there. Removing that should solve the problem, correct?
The library certainly can handle more tasks running (with the Running status) concurrently than the amount of cores. This will be especially the case with a I/O bound Tasks.
@svick: yep. Do you know how to efficiently control the max concurrent TPL tasks (not threads)?
s
symbiont

this is not good practice as it changes a global variable. it is also not a general solution for async. but it is easy for all instances of HttpClient, if that's all you're after. you can simply try:

System.Net.ServicePointManager.DefaultConnectionLimit = 20;

A
Alain

Here is a handy Extension Method you can create to wrap a list of tasks such that they will be executed with a maximum degree of concurrency:

/// <summary>Allows to do any async operation in bulk while limiting the system to a number of concurrent items being processed.</summary>
private static IEnumerable<Task<T>> WithMaxConcurrency<T>(this IEnumerable<Task<T>> tasks, int maxParallelism)
{
    SemaphoreSlim maxOperations = new SemaphoreSlim(maxParallelism);
    // The original tasks get wrapped in a new task that must first await a semaphore before the original task is called.
    return tasks.Select(task => maxOperations.WaitAsync().ContinueWith(_ =>
    {
        try { return task; }
        finally { maxOperations.Release(); }
    }).Unwrap());
}

Now instead of:

await Task.WhenAll(someTasks);

You can go

await Task.WhenAll(someTasks.WithMaxConcurrency(20));

C
Community

Parallel computations should be used for speeding up CPU-bound operations. Here we are talking about I/O bound operations. Your implementation should be purely async, unless you're overwhelming the busy single core on your multi-core CPU.

EDIT I like the suggestion made by usr to use an "async semaphore" here.


Good point! Though each task here will contain async and sync code (page downloaded asynchronously then processed in sync manner). I am trying to distribute the sync portion of the code accross CPUs and at the same time limit the amount of concurrent async I/O operations.
Why? Because launching 1000+ http requests simultaneously might not be a task well suited to the user's network capacity.
Parallel extensions can also be used as a way to multiplex I/O operations without having to manually implement a pure async solution. Which I agree could be considered sloppy, but as long as you keep a tight limit on the number of concurrent operations it probably won't strain the threadpool too much.
I don't think this answer is providing an answer. Being purely async is not enough here: We really want to throttle the physical IOs in a non-blocking manner.
Hmm.. not sure I agree... when working on a large project, if one too many developers takes this view, you'll get starvation even though each developer's contribution in isolation is not enough to tip things over the edge. Given that there is only one ThreadPool, even if you're treating it semi-respectfully... if everyone else is doing the same, trouble can follow. As such I always advise against running long stuff in the ThreadPool.
d
deadlydog

Essentially you're going to want to create an Action or Task for each URL that you want to hit, put them in a List, and then process that list, limiting the number that can be processed in parallel.

My blog post shows how to do this both with Tasks and with Actions, and provides a sample project you can download and run to see both in action.

With Actions

If using Actions, you can use the built-in .Net Parallel.Invoke function. Here we limit it to running at most 20 threads in parallel.

var listOfActions = new List<Action>();
foreach (var url in urls)
{
    var localUrl = url;
    // Note that we create the Task here, but do not start it.
    listOfTasks.Add(new Task(() => CallUrl(localUrl)));
}

var options = new ParallelOptions {MaxDegreeOfParallelism = 20};
Parallel.Invoke(options, listOfActions.ToArray());

With Tasks

With Tasks there is no built-in function. However, you can use the one that I provide on my blog.

    /// <summary>
    /// Starts the given tasks and waits for them to complete. This will run, at most, the specified number of tasks in parallel.
    /// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para>
    /// </summary>
    /// <param name="tasksToRun">The tasks to run.</param>
    /// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param>
    /// <param name="cancellationToken">The cancellation token.</param>
    public static async Task StartAndWaitAllThrottledAsync(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, CancellationToken cancellationToken = new CancellationToken())
    {
        await StartAndWaitAllThrottledAsync(tasksToRun, maxTasksToRunInParallel, -1, cancellationToken);
    }

    /// <summary>
    /// Starts the given tasks and waits for them to complete. This will run the specified number of tasks in parallel.
    /// <para>NOTE: If a timeout is reached before the Task completes, another Task may be started, potentially running more than the specified maximum allowed.</para>
    /// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para>
    /// </summary>
    /// <param name="tasksToRun">The tasks to run.</param>
    /// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param>
    /// <param name="timeoutInMilliseconds">The maximum milliseconds we should allow the max tasks to run in parallel before allowing another task to start. Specify -1 to wait indefinitely.</param>
    /// <param name="cancellationToken">The cancellation token.</param>
    public static async Task StartAndWaitAllThrottledAsync(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, int timeoutInMilliseconds, CancellationToken cancellationToken = new CancellationToken())
    {
        // Convert to a list of tasks so that we don't enumerate over it multiple times needlessly.
        var tasks = tasksToRun.ToList();

        using (var throttler = new SemaphoreSlim(maxTasksToRunInParallel))
        {
            var postTaskTasks = new List<Task>();

            // Have each task notify the throttler when it completes so that it decrements the number of tasks currently running.
            tasks.ForEach(t => postTaskTasks.Add(t.ContinueWith(tsk => throttler.Release())));

            // Start running each task.
            foreach (var task in tasks)
            {
                // Increment the number of tasks currently running and wait if too many are running.
                await throttler.WaitAsync(timeoutInMilliseconds, cancellationToken);

                cancellationToken.ThrowIfCancellationRequested();
                task.Start();
            }

            // Wait for all of the provided tasks to complete.
            // We wait on the list of "post" tasks instead of the original tasks, otherwise there is a potential race condition where the throttler's using block is exited before some Tasks have had their "post" action completed, which references the throttler, resulting in an exception due to accessing a disposed object.
            await Task.WhenAll(postTaskTasks.ToArray());
        }
    }

And then creating your list of Tasks and calling the function to have them run, with say a maximum of 20 simultaneous at a time, you could do this:

var listOfTasks = new List<Task>();
foreach (var url in urls)
{
    var localUrl = url;
    // Note that we create the Task here, but do not start it.
    listOfTasks.Add(new Task(async () => await CallUrl(localUrl)));
}
await Tasks.StartAndWaitAllThrottledAsync(listOfTasks, 20);

I think you are just specifying initialCount for SemaphoreSlim and you need to specify 2nd parameter i.e. maxCount in the constructor of SemaphoreSlim.
I want each response from each task processed into a List. How can I get return Result or response
J
JDawg

Use MaxDegreeOfParallelism, which is an option you can specify in Parallel.ForEach():

var options = new ParallelOptions { MaxDegreeOfParallelism = 20 };

Parallel.ForEach(urls, options,
    url =>
        {
            var client = new HttpClient();
            var html = client.GetStringAsync(url);
            // do stuff with html
        });

I don't think this works. GetStringAsync(url) is meant to be called with await. If you inspect the type of var html, it is a Task<string>, not the result string.
@NealEhardt is correct. Parallel.ForEach(...) is intended for running blocks of synchronous code in parallel (e.g. on different threads).