我想并行处理一个集合,但我在实现它时遇到了麻烦,因此我希望能得到一些帮助。
如果我想在并行循环的 lambda 中调用 C# 中标记为 async 的方法,就会出现问题。例如:
var bag = new ConcurrentBag<object>();
Parallel.ForEach(myCollection, async item =>
{
// some pre stuff
var response = await GetData(item);
bag.Add(response);
// some post stuff
}
var count = bag.Count;
计数为 0 时会出现问题,因为创建的所有线程实际上只是后台线程,并且 Parallel.ForEach
调用不会等待完成。如果我删除 async 关键字,该方法如下所示:
var bag = new ConcurrentBag<object>();
Parallel.ForEach(myCollection, item =>
{
// some pre stuff
var responseTask = await GetData(item);
responseTask.Wait();
var response = responseTask.Result;
bag.Add(response);
// some post stuff
}
var count = bag.Count;
它可以工作,但它完全禁用了等待的聪明,我必须做一些手动异常处理..(为简洁起见已删除)。
如何实现在 lambda 中使用 await 关键字的 Parallel.ForEach
循环?可能吗?
Parallel.ForEach 方法的原型采用 Action<T>
作为参数,但我希望它等待我的异步 lambda。
await GetData(item)
中删除 await
,因为它会按原样产生编译错误。
如果你只想要简单的并行性,你可以这样做:
var bag = new ConcurrentBag<object>();
var tasks = myCollection.Select(async item =>
{
// some pre stuff
var response = await GetData(item);
bag.Add(response);
// some post stuff
});
await Task.WhenAll(tasks);
var count = bag.Count;
如果您需要更复杂的内容,请查看 Stephen Toub's ForEachAsync
post。
您可以使用 AsyncEnumerator NuGet Package 中的 ParallelForEachAsync
扩展方法:
using Dasync.Collections;
var bag = new ConcurrentBag<object>();
await myCollection.ParallelForEachAsync(async item =>
{
// some pre stuff
var response = await GetData(item);
bag.Add(response);
// some post stuff
}, maxDegreeOfParallelism: 10);
var count = bag.Count;
免责声明:我是 AsyncEnumerator 库的作者,该库是开源的并在 MIT 下获得许可,我发布此消息只是为了帮助社区。
maxDegreeOfParallelism
> maxDegreeOfParalellism
新的 .NET 6 API 之一是 Parallel.ForEachAsync,这是一种安排异步工作的方法,允许您控制并行度:
var urls = new []
{
"https://dotnet.microsoft.com",
"https://www.microsoft.com",
"https://stackoverflow.com"
};
var client = new HttpClient();
var options = new ParallelOptions { MaxDegreeOfParallelism = 2 };
await Parallel.ForEachAsync(urls, options, async (url, token) =>
{
var targetPath = Path.Combine(Path.GetTempPath(), "http_cache", url);
var response = await client.GetAsync(url);
if (response.IsSuccessStatusCode)
{
using var target = File.OpenWrite(targetPath);
await response.Content.CopyToAsync(target);
}
});
另一个示例 in Scott Hanselman's blog。
The source,供参考。
使用 SemaphoreSlim
,您可以实现并行控制。
var bag = new ConcurrentBag<object>();
var maxParallel = 20;
var throttler = new SemaphoreSlim(initialCount: maxParallel);
var tasks = myCollection.Select(async item =>
{
await throttler.WaitAsync();
try
{
var response = await GetData(item);
bag.Add(response);
}
finally
{
throttler.Release();
}
});
await Task.WhenAll(tasks);
var count = bag.Count;
using
语句包装,因为它实现了 IDisposable
从其他答案和公认的 asnwer 引用的文章中编译的最简单的可能扩展方法:
public static async Task ParallelForEachAsync<T>(this IEnumerable<T> source, Func<T, Task> asyncAction, int maxDegreeOfParallelism)
{
var throttler = new SemaphoreSlim(initialCount: maxDegreeOfParallelism);
var tasks = source.Select(async item =>
{
await throttler.WaitAsync();
try
{
await asyncAction(item).ConfigureAwait(false);
}
finally
{
throttler.Release();
}
});
await Task.WhenAll(tasks);
}
更新:这是一个简单的修改,它还支持评论中请求的取消令牌(未经测试)
public static async Task ParallelForEachAsync<T>(this IEnumerable<T> source, Func<T, CancellationToken, Task> asyncAction, int maxDegreeOfParallelism, CancellationToken cancellationToken)
{
var throttler = new SemaphoreSlim(initialCount: maxDegreeOfParallelism);
var tasks = source.Select(async item =>
{
await throttler.WaitAsync(cancellationToken);
if (cancellationToken.IsCancellationRequested) return;
try
{
await asyncAction(item, cancellationToken).ConfigureAwait(false);
}
finally
{
throttler.Release();
}
});
await Task.WhenAll(tasks);
}
int maxDegreeOfParallelism = 10
。最好让用户明确指定并发级别。您还可以考虑在 asyncAction(item)
之后添加一个 .ConfigureAwait(false)
(但不是在 throttler.WaitAsync()
之后)。
cancellationToken
的变体将在稍后发布。
Task.WhenAll
的解决方案只适用于数量相对较少的任务,或者适用于保证不抛出的asyncAction
。否则,等待半小时完成 10,000 个任务会非常令人沮丧,结果只是收到一个异常(这可能是由第一个任务引发的)。
throttler
未被处置。
我的 ParallelForEach 异步的轻量级实现。
特征:
节流(最大并行度)。异常处理(完成时会抛出聚合异常)。内存高效(无需存储任务列表)。
public static class AsyncEx
{
public static async Task ParallelForEachAsync<T>(this IEnumerable<T> source, Func<T, Task> asyncAction, int maxDegreeOfParallelism = 10)
{
var semaphoreSlim = new SemaphoreSlim(maxDegreeOfParallelism);
var tcs = new TaskCompletionSource<object>();
var exceptions = new ConcurrentBag<Exception>();
bool addingCompleted = false;
foreach (T item in source)
{
await semaphoreSlim.WaitAsync();
asyncAction(item).ContinueWith(t =>
{
semaphoreSlim.Release();
if (t.Exception != null)
{
exceptions.Add(t.Exception);
}
if (Volatile.Read(ref addingCompleted) && semaphoreSlim.CurrentCount == maxDegreeOfParallelism)
{
tcs.TrySetResult(null);
}
});
}
Volatile.Write(ref addingCompleted, true);
await tcs.Task;
if (exceptions.Count > 0)
{
throw new AggregateException(exceptions);
}
}
}
使用示例:
await Enumerable.Range(1, 10000).ParallelForEachAsync(async (i) =>
{
var data = await GetData(i);
}, maxDegreeOfParallelism: 100);
tcs.SetResult(null)
需要替换为 tcs.TrySetResult(null)
SetResult
的问题)When to use SetResult() vs TrySetResult()
SemaphoreSlim
的 CurrentCount
属性来控制执行流程不是一个好主意。在大多数情况下,它会产生竞争条件。使用 Volatile.Read
也是不稳定的(另一种可能的竞争条件)。我不会在生产环境中相信这个解决方案。
我为此创建了一个扩展方法,它利用 SemaphoreSlim 并且还允许设置最大并行度
/// <summary>
/// Concurrently Executes async actions for each item of <see cref="IEnumerable<typeparamref name="T"/>
/// </summary>
/// <typeparam name="T">Type of IEnumerable</typeparam>
/// <param name="enumerable">instance of <see cref="IEnumerable<typeparamref name="T"/>"/></param>
/// <param name="action">an async <see cref="Action" /> to execute</param>
/// <param name="maxDegreeOfParallelism">Optional, An integer that represents the maximum degree of parallelism,
/// Must be grater than 0</param>
/// <returns>A Task representing an async operation</returns>
/// <exception cref="ArgumentOutOfRangeException">If the maxActionsToRunInParallel is less than 1</exception>
public static async Task ForEachAsyncConcurrent<T>(
this IEnumerable<T> enumerable,
Func<T, Task> action,
int? maxDegreeOfParallelism = null)
{
if (maxDegreeOfParallelism.HasValue)
{
using (var semaphoreSlim = new SemaphoreSlim(
maxDegreeOfParallelism.Value, maxDegreeOfParallelism.Value))
{
var tasksWithThrottler = new List<Task>();
foreach (var item in enumerable)
{
// Increment the number of currently running tasks and wait if they are more than limit.
await semaphoreSlim.WaitAsync();
tasksWithThrottler.Add(Task.Run(async () =>
{
await action(item).ContinueWith(res =>
{
// action is completed, so decrement the number of currently running tasks
semaphoreSlim.Release();
});
}));
}
// Wait for all tasks to complete.
await Task.WhenAll(tasksWithThrottler.ToArray());
}
}
else
{
await Task.WhenAll(enumerable.Select(item => action(item)));
}
}
样品用法:
await enumerable.ForEachAsyncConcurrent(
async item =>
{
await SomeAsyncMethod(item);
},
5);
在接受的答案中,不需要 ConcurrentBag。这是一个没有它的实现:
var tasks = myCollection.Select(GetData).ToList();
await Task.WhenAll(tasks);
var results = tasks.Select(t => t.Result);
任何“// some pre stuff”和“// some post stuff”都可以进入GetData实现(或另一个调用GetData的方法)
除了更短之外,没有使用“async void”lambda,这是一种反模式。
以下设置为使用 IAsyncEnumerable
,但可以修改为使用 IEnumerable
,只需更改类型并删除 foreach
上的“等待”。它比创建无数并行任务然后等待它们全部更适合大量数据。
public static async Task ForEachAsyncConcurrent<T>(this IAsyncEnumerable<T> enumerable, Func<T, Task> action, int maxDegreeOfParallelism, int? boundedCapacity = null)
{
ActionBlock<T> block = new ActionBlock<T>(
action,
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = maxDegreeOfParallelism,
BoundedCapacity = boundedCapacity ?? maxDegreeOfParallelism * 3
});
await foreach (T item in enumerable)
{
await block.SendAsync(item).ConfigureAwait(false);
}
block.Complete();
await block.Completion;
}
semaphore.Wait()
替换为 await semaphore.WaitAsync()
,以避免阻塞调用者。另请注意,您的解决方案中 SemaphoreSlim
的功能可以替换为 ActionBlock
的 BoundedCapacity
配置以及可等待的 SendAsync
方法。相比之下,它更有效(内存方面)。
enumerable
可能会导致异常,在这种情况下,您的实现将立即传播此异常,而无需等待 {2 }。这不是最佳行为,因为它可能会使在后台运行的任务不被观察(以即发即弃的方式)。正确实现 ForEachAsync
方法可能非常棘手。我最近才意识到这个问题。
SendAsync
方法的文档非常混乱。我怀疑这个星球上是否存在过足够聪明的人,仅仅通过阅读文档就可以理解这种方法的作用。应该深入研究源代码并了解 Post
和 SendAsync
方法都基于隐藏的(显式实现的)OfferMessage
API,它有 5 个可能的返回值。 SendAsync
异步处理 Postponed
返回值。
对于更简单的解决方案(不确定是否是最佳解决方案),您可以简单地将 Parallel.ForEach
嵌套在 Task
中 - 因此
var options = new ParallelOptions { MaxDegreeOfParallelism = 5 }
Task.Run(() =>
{
Parallel.ForEach(myCollection, options, item =>
{
DoWork(item);
}
}
ParallelOptions
将为您执行节流,开箱即用。
我在现实世界的场景中使用它在后台运行很长时间的操作。这些操作是通过 HTTP 调用的,它被设计为在长操作运行时不会阻塞 HTTP 调用。
调用 HTTP 进行长时间后台操作。操作从后台开始。用户获取状态 ID,可用于使用另一个 HTTP 调用检查状态。后台操作更新其状态。
这样,CI/CD 调用不会因为长时间的 HTTP 操作而超时,而是每 x 秒循环一次状态而不阻塞进程
Parallel.ForEach
方法不仅仅是“不是最佳实践”。它存在严重且不可挽回的缺陷。 Parallel.ForEach
不理解异步委托,因此 lambda 为 async void
。这不是一发不可收拾,而是一发不可收拾。在这种情况下,Parallel.ForEach
不会等待启动的操作完成,不会强制执行最大程度的并行,也不会传播异常。任何异常都将未被处理,并使进程崩溃。
Console.WriteLine
方法没有意义,因为此方法是同步的。一次只有一个线程可以写入 Console
。还要注意 Thread.Sleep(15000);
的丑陋。您添加了这一行,因为否则程序将在 async void
操作完成之前结束,该操作被滥用的 Parallel.ForEach
循环无法控制地启动。这不是编写软件的正确方法。
Parallel.ForEach
方法相关,是我无可争辩的反对意见。之前或之后的任何警告,或删除线之类的删除指示,都无法容忍此短语的存在。我只是在谈论我自己的投票标准。其他任何人都可以投票,但他们认为合适。
不定期副业成功案例分享
dop
任务,然后每个任务依次处理输入集合的某些子集。Task.Run
而没有await
结果,那么这只是将一劳永逸的工作扔到线程池中。这几乎总是一个错误。