ChatGPT解决这个技术问题 Extra ChatGPT

Parallel foreach with asynchronous lambda

I would like to handle a collection in parallel, but I'm having trouble implementing it and I'm therefore hoping for some help.

The trouble arises if I want to call a method marked async in C#, within the lambda of the parallel loop. For example:

var bag = new ConcurrentBag<object>();
Parallel.ForEach(myCollection, async item =>
{
  // some pre stuff
  var response = await GetData(item);
  bag.Add(response);
  // some post stuff
}
var count = bag.Count;

The problem occurs with the count being 0, because all the threads created are effectively just background threads and the Parallel.ForEach call doesn't wait for completion. If I remove the async keyword, the method looks like this:

var bag = new ConcurrentBag<object>();
Parallel.ForEach(myCollection, item =>
{
  // some pre stuff
  var responseTask = await GetData(item);
  responseTask.Wait();
  var response = responseTask.Result;
  bag.Add(response);
  // some post stuff
}
var count = bag.Count;

It works, but it completely disables the await cleverness and I have to do some manual exception handling.. (Removed for brevity).

How can I implement a Parallel.ForEach loop, that uses the await keyword within the lambda? Is it possible?

The prototype of the Parallel.ForEach method takes an Action<T> as parameter, but I want it to wait for my asynchronous lambda.

I assume you meant to remove await from await GetData(item) in your second code block as it would produce a compilation error as-is.
Possible duplicate of Nesting await in Parallel.ForEach
As a side note, the ConcurrentBag<T> is a very specialized collection. A ConcurrentQueue<T> would serve you better in this case.

R
Rocklan

If you just want simple parallelism, you can do this:

var bag = new ConcurrentBag<object>();
var tasks = myCollection.Select(async item =>
{
  // some pre stuff
  var response = await GetData(item);
  bag.Add(response);
  // some post stuff
});
await Task.WhenAll(tasks);
var count = bag.Count;

If you need something more complex, check out Stephen Toub's ForEachAsync post.


Probably a throttling mechanism is needed. This will immediately create as many tasks as there are items which might end up in 10k network requests and such.
@usr The last example in Stephen Toub's article addresses that.
@LukePuplett It creates dop tasks and each of them then processes some subset of the input collection in series.
@Afshin_Zavvar: If you call Task.Run without awaiting the result, then that's just throwing fire-and-forget work onto the thread pool. That is almost always a mistake.
A simple throttling mechanism for this approach is to split your list into small lists of N entries, and perform this task select + Task.WhenAll for each smaller batch. This way you don't spawn thousands of tasks for large data sets.
I
Ian Kemp

You can use the ParallelForEachAsync extension method from AsyncEnumerator NuGet Package:

using Dasync.Collections;

var bag = new ConcurrentBag<object>();
await myCollection.ParallelForEachAsync(async item =>
{
  // some pre stuff
  var response = await GetData(item);
  bag.Add(response);
  // some post stuff
}, maxDegreeOfParallelism: 10);
var count = bag.Count;

Disclaimer: I'm the author of the AsyncEnumerator library, which is open source and licensed under MIT, and I'm posting this message just to help the community.


This is your package? I have seen you post this in a few places now? :D Oh wait.. your name is on the package :D +1
@ppumkin, yes, it's mine. I've seen this problem over and over again, so decided to solve it in simplest way possible and free others from struggling as well :)
you have a typo: maxDegreeOfParallelism > maxDegreeOfParalellism
The correct spelling is indeed maxDegreeOfParallelism, however there's something in @ShiranDror's comment - in your package you called the variable maxDegreeOfParalellism by mistake (and therefore your quoted code won't compile until you change it..)
@SergeSemenov In that case I think you might want to update the link in this answer, since it points at V1.10. Since you're active on this question, I will leave that to you.
I
Ian Kemp

One of the new .NET 6 APIs is Parallel.ForEachAsync, a way to schedule asynchronous work that allows you to control the degree of parallelism:

var urls = new [] 
{
    "https://dotnet.microsoft.com",
    "https://www.microsoft.com",
    "https://stackoverflow.com"
};

var client = new HttpClient();

var options = new ParallelOptions { MaxDegreeOfParallelism = 2 };
await Parallel.ForEachAsync(urls, options, async (url, token) =>
{
    var targetPath = Path.Combine(Path.GetTempPath(), "http_cache", url);

    var response = await client.GetAsync(url);

    if (response.IsSuccessStatusCode)
    {
        using var target = File.OpenWrite(targetPath);

        await response.Content.CopyToAsync(target);
    }
});

Another example in Scott Hanselman's blog.

The source, for reference.


F
Felipe l

With SemaphoreSlim you can achieve parallelism control.

var bag = new ConcurrentBag<object>();
var maxParallel = 20;
var throttler = new SemaphoreSlim(initialCount: maxParallel);
var tasks = myCollection.Select(async item =>
{
  await throttler.WaitAsync();
  try
  {
     var response = await GetData(item);
     bag.Add(response);
  }
  finally
  {
     throttler.Release();
  }
});
await Task.WhenAll(tasks);
var count = bag.Count;

SemaphoreSlim should be wrapped with a using statement because it implements IDisposable
Also, this line "await throttler.WaitAsync();" should NOT be within the try block because if it throws an exception you would be calling Release when you haven't acquired the lock.
@tim assuming your comment is resolved, can you remove it please (and/or @ me if I forget to remove this after you have!)?
@Tim Thank you Tim. I made the necessary changes.
A
Alex from Jitbit

Simplest possible extension method compiled from other answers and the article referenced by the accepted asnwer:

public static async Task ParallelForEachAsync<T>(this IEnumerable<T> source, Func<T, Task> asyncAction, int maxDegreeOfParallelism)
{
    var throttler = new SemaphoreSlim(initialCount: maxDegreeOfParallelism);
    var tasks = source.Select(async item =>
    {
        await throttler.WaitAsync();
        try
        {
            await asyncAction(item).ConfigureAwait(false);
        }
        finally
        {
            throttler.Release();
        }
    });
    await Task.WhenAll(tasks);
}

UPDATE: here's a simple modification that also supports a cancellation token like requested in the comments (untested)

public static async Task ParallelForEachAsync<T>(this IEnumerable<T> source, Func<T, CancellationToken, Task> asyncAction, int maxDegreeOfParallelism, CancellationToken cancellationToken)
{
    var throttler = new SemaphoreSlim(initialCount: maxDegreeOfParallelism);
    var tasks = source.Select(async item =>
    {
        await throttler.WaitAsync(cancellationToken);
        if (cancellationToken.IsCancellationRequested) return;

        try
        {
            await asyncAction(item, cancellationToken).ConfigureAwait(false);
        }
        finally
        {
            throttler.Release();
        }
    });
    await Task.WhenAll(tasks);
}

I would upvote it, but I don't like the arbitrary int maxDegreeOfParallelism = 10. Better let the user specify the level of concurrency explicitly. You could also consider adding a .ConfigureAwait(false) after the asyncAction(item) (but not after the throttler.WaitAsync()).
@TheodorZoulias both good points, edited. Also working on an variant that supports a cancellationToken will post it later.
It is worth noting that most solutions based on Task.WhenAll are only suitable for a relatively small number of tasks, or for asyncActions that are guaranteed not to throw. Otherwise it can be very frustrating to wait half an hour for the completion of 10,000 tasks, and just receive an exception as a result (that could have been thrown by the very first task).
This is NOT a robust solution for 2 reasons. First, if exception thrown it will not terminate the loop. Second, throttler is not disposed.
@zmechanic I think it's up to the developer whether or not to abort the loop on exception.
n
nicolas2008

My lightweight implementation of ParallelForEach async.

Features:

Throttling (max degree of parallelism). Exception handling (aggregation exception will be thrown at completion). Memory efficient (no need to store the list of tasks).

public static class AsyncEx
{
    public static async Task ParallelForEachAsync<T>(this IEnumerable<T> source, Func<T, Task> asyncAction, int maxDegreeOfParallelism = 10)
    {
        var semaphoreSlim = new SemaphoreSlim(maxDegreeOfParallelism);
        var tcs = new TaskCompletionSource<object>();
        var exceptions = new ConcurrentBag<Exception>();
        bool addingCompleted = false;

        foreach (T item in source)
        {
            await semaphoreSlim.WaitAsync();
            asyncAction(item).ContinueWith(t =>
            {
                semaphoreSlim.Release();

                if (t.Exception != null)
                {
                    exceptions.Add(t.Exception);
                }

                if (Volatile.Read(ref addingCompleted) && semaphoreSlim.CurrentCount == maxDegreeOfParallelism)
                {
                    tcs.TrySetResult(null);
                }
            });
        }

        Volatile.Write(ref addingCompleted, true);
        await tcs.Task;
        if (exceptions.Count > 0)
        {
            throw new AggregateException(exceptions);
        }
    }
}

Usage example:

await Enumerable.Range(1, 10000).ParallelForEachAsync(async (i) =>
{
    var data = await GetData(i);
}, maxDegreeOfParallelism: 100);

tcs.SetResult(null) need replace to tcs.TrySetResult(null)
@Hocas, why do you think TrySetResult is needed?
I had a problem with multiple call SetResult when last time I used this code) When to use SetResult() vs TrySetResult()
@Hocas, that's interesting. tcs.SetResult(null) is not expected to be executed twice.
Using the CurrentCount property of the SemaphoreSlim for controlling the execution flow is not a good idea. In most cases it creates race conditions. Using the Volatile.Read is also shaky (another possible race condition). I wouldn't trust this solution in a production environment.
J
Jay Shah

I've created an extension method for this which makes use of SemaphoreSlim and also allows to set maximum degree of parallelism

    /// <summary>
    /// Concurrently Executes async actions for each item of <see cref="IEnumerable<typeparamref name="T"/>
    /// </summary>
    /// <typeparam name="T">Type of IEnumerable</typeparam>
    /// <param name="enumerable">instance of <see cref="IEnumerable<typeparamref name="T"/>"/></param>
    /// <param name="action">an async <see cref="Action" /> to execute</param>
    /// <param name="maxDegreeOfParallelism">Optional, An integer that represents the maximum degree of parallelism,
    /// Must be grater than 0</param>
    /// <returns>A Task representing an async operation</returns>
    /// <exception cref="ArgumentOutOfRangeException">If the maxActionsToRunInParallel is less than 1</exception>
    public static async Task ForEachAsyncConcurrent<T>(
        this IEnumerable<T> enumerable,
        Func<T, Task> action,
        int? maxDegreeOfParallelism = null)
    {
        if (maxDegreeOfParallelism.HasValue)
        {
            using (var semaphoreSlim = new SemaphoreSlim(
                maxDegreeOfParallelism.Value, maxDegreeOfParallelism.Value))
            {
                var tasksWithThrottler = new List<Task>();

                foreach (var item in enumerable)
                {
                    // Increment the number of currently running tasks and wait if they are more than limit.
                    await semaphoreSlim.WaitAsync();

                    tasksWithThrottler.Add(Task.Run(async () =>
                    {
                        await action(item).ContinueWith(res =>
                        {
                            // action is completed, so decrement the number of currently running tasks
                            semaphoreSlim.Release();
                        });
                    }));
                }

                // Wait for all tasks to complete.
                await Task.WhenAll(tasksWithThrottler.ToArray());
            }
        }
        else
        {
            await Task.WhenAll(enumerable.Select(item => action(item)));
        }
    }

Sample Usage:

await enumerable.ForEachAsyncConcurrent(
    async item =>
    {
        await SomeAsyncMethod(item);
    },
    5);

'using' will not help. foreach loop will be waiting for semaphone indefinitely. Just try this simple code that reproduces the issue: await Enumerable.Range(1, 4).ForEachAsyncConcurrent(async (i) => { Console.WriteLine(i); throw new Exception("test exception"); }, maxDegreeOfParallelism: 2);
@nicolay.anykienko you are right about #2. That memory problem can be solved by adding tasksWithThrottler.RemoveAll(x => x.IsCompleted);
I've tried it in my code and if I maxDegreeOfParallelism is not null the code deadlocks. Here you can see all the code to reproduce: stackoverflow.com/questions/58793118/…
My concern with this approach when I looked at implementing it for my use, was the 1.7 million rows I was processing would result in each having a job in the tasksWithThrottler List, and that didn't seem ideal or really scalable. Posting the solution my teammate and I came up with using ActionBlock as a separate solution.
Please add cancellation token code... aswell plz
T
Tom

In the accepted answer the ConcurrentBag is not required. Here's an implementation without it:

var tasks = myCollection.Select(GetData).ToList();
await Task.WhenAll(tasks);
var results = tasks.Select(t => t.Result);

Any of the "// some pre stuff" and "// some post stuff" can go into the GetData implementation (or another method that calls GetData)

Aside from being shorter, there's no use of an "async void" lambda, which is an anti pattern.


A
Alexei Levenkov

The following is set to work with IAsyncEnumerable but can be modified to use IEnumerable by just changing the type and removing the "await" on the foreach. It's far more appropriate for large sets of data than creating countless parallel tasks and then awaiting them all.

    public static async Task ForEachAsyncConcurrent<T>(this IAsyncEnumerable<T> enumerable, Func<T, Task> action, int maxDegreeOfParallelism, int? boundedCapacity = null)
    {
        ActionBlock<T> block = new ActionBlock<T>(
           action, 
           new ExecutionDataflowBlockOptions 
           { 
             MaxDegreeOfParallelism = maxDegreeOfParallelism, 
             BoundedCapacity = boundedCapacity ?? maxDegreeOfParallelism * 3 
           });

        await foreach (T item in enumerable)
        {
           await block.SendAsync(item).ConfigureAwait(false);
        }

        block.Complete();
        await block.Completion;
    }

You should probably replace the semaphore.Wait() with await semaphore.WaitAsync(), to avoid blocking the caller. Also be aware that the functionality of the SemaphoreSlim in your solution can be replaced by the BoundedCapacity configuration of the ActionBlock, in combination with the awaitable SendAsync method. Comparatively it is more efficient (memory-wise).
@TheodorZoulias Thanks so much for the feedback! It's something I'm actively working on for a project so I'll look at those changes and update my solution.
stackoverflow.com/a/65251949/477420 answer by @TheodorZoulias shows very similar approach... presumably SendAsync does not wait for operation to finish (which is not clear to me from the docs)
Caleb Holt another gotcha that you may want to be aware of is that enumerating the user-supplied enumerable could potentially result to an exception, and in this case your implementation will just propagate immediately this exception, without awaiting the completion of the ActionBlock. This is not an optimal behavior, because it may leave tasks running in the background unobserved (in fire-and-forget fashion). Implementing correctly a ForEachAsync method can be quite tricky. I became aware of the gotcha myself very recently.
@AlexeiLevenkov the documentation of the SendAsync method is quite confusing. I doubt that a person smart enough has ever existed on this planet, that could understand what this method does by just reading the docs. One should delve deep into the source code and understand that both Post and SendAsync methods are based on the hidden (explicitly implemented) OfferMessage API, that has 5 possible return values. The SendAsync handles asynchronously the Postponed return value.
G
Gravity API

For a more simple solution (not sure if the most optimal), you can simply nest Parallel.ForEach inside a Task - as such

var options = new ParallelOptions { MaxDegreeOfParallelism = 5 }
Task.Run(() =>
{
    Parallel.ForEach(myCollection, options, item =>
    {
        DoWork(item);
    }
}

The ParallelOptions will do the throttlering for you, out of the box.

I am using it in a real world scenario to run a very long operations in the background. These operations are called via HTTP and it was designed not to block the HTTP call while the long operation is running.

Calling HTTP for long background operation. Operation starts at the background. User gets status ID which can be used to check the status using another HTTP call. The background operation update its status.

That way, the CI/CD call does not timeout because of long HTTP operation, rather it loops the status every x seconds without blocking the process


Gravity I am sorry that I have to downvote your answer, but passing an async delegate to the Parallel.ForEach method is more than "not a best practice". It is deeply and irrecoverably flawed. The Parallel.ForEach does not understand async delegates, so the lambda is async void. It's not fire-and-forget, it's fire-and-crash. In such a case the Parallel.ForEach will not wait for the completion of the launched operations, it will not enforce a maximum degree of parallelism, and it will not propagate exceptions. Any exception will be unhandled and will crash the process.
Gravity this is a bad example. Parallelizing the Console.WriteLine method makes no sense, because this method is synchronized. Only one thread can write to the Console at a time. Also notice the ugliness of the Thread.Sleep(15000);. You added this line because otherwise the program would end before the completion of the async void operations launched uncontrollably by the misused Parallel.ForEach loop. This is not the correct way to write software.
Well, you can't expect good votes by presenting bad examples and indirectly promoting bad practices, whether you recommend them or not. How about removing all the bad stuff from your answer, and keeping the good stuff?
Gravity the phrase "You can use async lambda as well", connected with the Parallel.ForEach method, is an indisputable downvote by me. No amount of warnings before or after, or deletion indications like strikethrough, can make the presence of this phrase tolerable. I am talking exclusively about my own voting criteria. Anyone else can vote however they see fit.
Accepted and make sense. Didn't think about it that way and I agree with your criteria and change my post accordingly.