使用 ForEach 时是否可以使用异步?下面是我正在尝试的代码:
using (DataContext db = new DataLayer.DataContext())
{
db.Groups.ToList().ForEach(i => async {
await GetAdminsFromGroup(i.Gid);
});
}
我收到错误消息:
当前上下文中不存在名称“异步”
包含 using 语句的方法设置为异步。
List<T>.ForEach
不能与 async
配合得特别好(LINQ-to-objects 也不能,原因相同)。
在这种情况下,我建议将每个元素投射到异步操作中,然后您可以(异步)等待它们全部完成。
using (DataContext db = new DataLayer.DataContext())
{
var tasks = db.Groups.ToList().Select(i => GetAdminsFromGroupAsync(i.Gid));
var results = await Task.WhenAll(tasks);
}
与将 async
委托给 ForEach
相比,这种方法的好处是:
错误处理更合适。来自 async void 的异常不能被 catch 捕获;这种方法将在 await Task.WhenAll 行传播异常,允许自然的异常处理。您知道任务在此方法结束时完成,因为它执行 await Task.WhenAll。如果使用 async void,则无法轻易判断操作何时完成。这种方法具有用于检索结果的自然语法。 GetAdminsFromGroupAsync 听起来像是一个产生结果的操作(管理员),如果这样的操作可以返回它们的结果而不是设置一个值作为副作用,那么这样的代码会更自然。
这个小扩展方法应该为您提供异常安全的异步迭代:
public static async Task ForEachAsync<T>(this List<T> list, Func<T, Task> func)
{
foreach (var value in list)
{
await func(value);
}
}
由于我们将 lambda 的返回类型从 void
更改为 Task
,因此异常将正确传播。这将允许您在实践中编写如下内容:
await db.Groups.ToList().ForEachAsync(async i => {
await GetAdminsFromGroup(i.Gid);
});
someList.ForeEach(async item => { await SomeFunctionAsync(); })
将我的代码更改为您的方法时,我的数据会从数据库中正确加载。当在这里使用我的方法时,列表中的数据不会显示在我的结果中。你知道为什么吗?
从 C# 8.0
开始,您可以异步创建和使用流。
private async void button1_Click(object sender, EventArgs e)
{
IAsyncEnumerable<int> enumerable = GenerateSequence();
await foreach (var i in enumerable)
{
Debug.WriteLine(i);
}
}
public static async IAsyncEnumerable<int> GenerateSequence()
{
for (int i = 0; i < 20; i++)
{
await Task.Delay(100);
yield return i;
}
}
MoveNext
。这在枚举器无法立即获取下一个元素并且必须等待一个可用的情况下很重要。
简单的答案是使用 foreach
关键字而不是 List()
的 ForEach()
方法。
using (DataContext db = new DataLayer.DataContext())
{
foreach(var i in db.Groups)
{
await GetAdminsFromGroup(i.Gid);
}
}
List()
的 ForEach()
方法?
public void ForEach (Action<T> action)
@taylorswiftfan。为了让它工作,它需要是 ForEach(Task<T> action)
。由于我们没有通过普通的 ol' for 循环传递委托,因此我们可以等待循环内的每个调用。不过,现在有更好的方法。 stackoverflow.com/a/58112039/3198973
这是上述异步 foreach 变体的实际工作版本,具有顺序处理:
public static async Task ForEachAsync<T>(this List<T> enumerable, Action<T> action)
{
foreach (var item in enumerable)
await Task.Run(() => { action(item); }).ConfigureAwait(false);
}
这是实现:
public async void SequentialAsync()
{
var list = new List<Action>();
Action action1 = () => {
//do stuff 1
};
Action action2 = () => {
//do stuff 2
};
list.Add(action1);
list.Add(action2);
await list.ForEachAsync();
}
关键区别是什么? .ConfigureAwait(false);
在每个任务的异步顺序处理时保持主线程的上下文。
问题是 async
关键字需要出现在 lambda 之前,而不是在正文之前:
db.Groups.ToList().ForEach(async (i) => {
await GetAdminsFromGroup(i.Gid);
});
async void
的不必要且微妙的使用。这种方法在异常处理和知道异步操作何时完成方面存在问题。
添加此扩展方法
public static class ForEachAsyncExtension
{
public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body)
{
return Task.WhenAll(from partition in Partitioner.Create(source).GetPartitions(dop)
select Task.Run(async delegate
{
using (partition)
while (partition.MoveNext())
await body(partition.Current).ConfigureAwait(false);
}));
}
}
然后像这样使用:
Task.Run(async () =>
{
var s3 = new AmazonS3Client(Config.Instance.Aws.Credentials, Config.Instance.Aws.RegionEndpoint);
var buckets = await s3.ListBucketsAsync();
foreach (var s3Bucket in buckets.Buckets)
{
if (s3Bucket.BucketName.StartsWith("mybucket-"))
{
log.Information("Bucket => {BucketName}", s3Bucket.BucketName);
ListObjectsResponse objects;
try
{
objects = await s3.ListObjectsAsync(s3Bucket.BucketName);
}
catch
{
log.Error("Error getting objects. Bucket => {BucketName}", s3Bucket.BucketName);
continue;
}
// ForEachAsync (4 is how many tasks you want to run in parallel)
await objects.S3Objects.ForEachAsync(4, async s3Object =>
{
try
{
log.Information("Bucket => {BucketName} => {Key}", s3Bucket.BucketName, s3Object.Key);
await s3.DeleteObjectAsync(s3Bucket.BucketName, s3Object.Key);
}
catch
{
log.Error("Error deleting bucket {BucketName} object {Key}", s3Bucket.BucketName, s3Object.Key);
}
});
try
{
await s3.DeleteBucketAsync(s3Bucket.BucketName);
}
catch
{
log.Error("Error deleting bucket {BucketName}", s3Bucket.BucketName);
}
}
}
}).Wait();
如果您使用的是 EntityFramework.Core,则有 an extension method ForEachAsync。
示例用法如下所示:
using Microsoft.EntityFrameworkCore;
using System.Threading.Tasks;
public class Example
{
private readonly DbContext _dbContext;
public Example(DbContext dbContext)
{
_dbContext = dbContext;
}
public async void LogicMethod()
{
await _dbContext.Set<dbTable>().ForEachAsync(async x =>
{
//logic
await AsyncTask(x);
});
}
public async Task<bool> AsyncTask(object x)
{
//other logic
return await Task.FromResult<bool>(true);
}
}
这是我为使用 ForEach
处理异步场景而创建的方法。
如果其中一项任务失败,则其他任务将继续执行。
您可以添加将在每个异常上执行的函数。
异常在最后被收集为 aggregateException 并且可供您使用。
可以处理 CancellationToken
public static class ParallelExecutor
{
/// <summary>
/// Executes asynchronously given function on all elements of given enumerable with task count restriction.
/// Executor will continue starting new tasks even if one of the tasks throws. If at least one of the tasks throwed exception then <see cref="AggregateException"/> is throwed at the end of the method run.
/// </summary>
/// <typeparam name="T">Type of elements in enumerable</typeparam>
/// <param name="maxTaskCount">The maximum task count.</param>
/// <param name="enumerable">The enumerable.</param>
/// <param name="asyncFunc">asynchronous function that will be executed on every element of the enumerable. MUST be thread safe.</param>
/// <param name="onException">Acton that will be executed on every exception that would be thrown by asyncFunc. CAN be thread unsafe.</param>
/// <param name="cancellationToken">The cancellation token.</param>
public static async Task ForEachAsync<T>(int maxTaskCount, IEnumerable<T> enumerable, Func<T, Task> asyncFunc, Action<Exception> onException = null, CancellationToken cancellationToken = default)
{
using var semaphore = new SemaphoreSlim(initialCount: maxTaskCount, maxCount: maxTaskCount);
// This `lockObject` is used only in `catch { }` block.
object lockObject = new object();
var exceptions = new List<Exception>();
var tasks = new Task[enumerable.Count()];
int i = 0;
try
{
foreach (var t in enumerable)
{
await semaphore.WaitAsync(cancellationToken);
tasks[i++] = Task.Run(
async () =>
{
try
{
await asyncFunc(t);
}
catch (Exception e)
{
if (onException != null)
{
lock (lockObject)
{
onException.Invoke(e);
}
}
// This exception will be swallowed here but it will be collected at the end of ForEachAsync method in order to generate AggregateException.
throw;
}
finally
{
semaphore.Release();
}
}, cancellationToken);
if (cancellationToken.IsCancellationRequested)
{
break;
}
}
}
catch (OperationCanceledException e)
{
exceptions.Add(e);
}
foreach (var t in tasks)
{
if (cancellationToken.IsCancellationRequested)
{
break;
}
// Exception handling in this case is actually pretty fast.
// https://gist.github.com/shoter/d943500eda37c7d99461ce3dace42141
try
{
await t;
}
#pragma warning disable CA1031 // Do not catch general exception types - we want to throw that exception later as aggregate exception. Nothing wrong here.
catch (Exception e)
#pragma warning restore CA1031 // Do not catch general exception types
{
exceptions.Add(e);
}
}
if (exceptions.Any())
{
throw new AggregateException(exceptions);
}
}
}
Count()
,源 IEnumerable<T>
被枚举了两次,这是一个禁忌。枚举可能会在每次枚举时访问数据库/文件系统/网络。 2. Task.Factory.StartNew(async
产生一个 Task<Task>
,而您只等待外部 Task
。该方法可能不会传播所有异常,并且可能在所有任务完成之前返回。并且 3. 使用 cancellationToken.IsCancellationRequested
可能会导致不一致的取消行为。请改用 ThrowIfCancellationRequested
。
Action<Exception> onException = null
参数,它的用例是什么?是在错误发生时立即记录错误,而不是在 ForEachAsync
方法完成后批量记录它们吗?
Task.Run
,它不会吞下它们。
我想补充一点,有一个内置 ForEach 函数的 Parallel class 可用于此目的。
Parallel
类 is not async-friendly。
Parallel
类非常适合 CPU 密集型工作,但不适用于 I/O 密集型工作。例如,当您进行网络请求时的 you don't really need threads。
List.ForEach()
不是 LINQ 的一部分。async
的所有回答。他们非常有帮助!await
执行foreach
。ForEach
仅采用同步委托类型,并且没有采用异步委托类型的重载。所以简短的回答是“没有人写过异步ForEach
”。更长的答案是您必须假设一些语义;例如,应该一次处理一个项目(如foreach
)还是同时处理(如Select
)?如果一次一个,异步流不是更好的解决方案吗?如果同时,结果应该按原始项目顺序还是按完成顺序?它应该在第一次失败时失败还是等到所有都完成?等等。SemaphoreSlim
限制异步任务。