ChatGPT解决这个技术问题 Extra ChatGPT

如何将异步与 ForEach 一起使用?

使用 ForEach 时是否可以使用异步?下面是我正在尝试的代码:

using (DataContext db = new DataLayer.DataContext())
{
    db.Groups.ToList().ForEach(i => async {
        await GetAdminsFromGroup(i.Gid);
    });
}

我收到错误消息:

当前上下文中不存在名称“异步”

包含 using 语句的方法设置为异步。


S
Stephen Cleary

List<T>.ForEach 不能与 async 配合得特别好(LINQ-to-objects 也不能,原因相同)。

在这种情况下,我建议将每个元素投射到异步操作中,然后您可以(异步)等待它们全部完成。

using (DataContext db = new DataLayer.DataContext())
{
    var tasks = db.Groups.ToList().Select(i => GetAdminsFromGroupAsync(i.Gid));
    var results = await Task.WhenAll(tasks);
}

与将 async 委托给 ForEach 相比,这种方法的好处是:

错误处理更合适。来自 async void 的异常不能被 catch 捕获;这种方法将在 await Task.WhenAll 行传播异常,允许自然的异常处理。您知道任务在此方法结束时完成,因为它执行 await Task.WhenAll。如果使用 async void,则无法轻易判断操作何时完成。这种方法具有用于检索结果的自然语法。 GetAdminsFromGroupAsync 听起来像是一个产生结果的操作(管理员),如果这样的操作可以返回它们的结果而不是设置一个值作为副作用,那么这样的代码会更自然。


并不是说它会改变任何东西,但 List.ForEach() 不是 LINQ 的一部分。
@StephenCleary 的好建议,感谢您对 async 的所有回答。他们非常有帮助!
@StewartAnderson:任务将同时执行。串行执行没有扩展;只需在循环主体中使用 await 执行 foreach
@mare:ForEach 仅采用同步委托类型,并且没有采用异步委托类型的重载。所以简短的回答是“没有人写过异步ForEach”。更长的答案是您必须假设一些语义;例如,应该一次处理一个项目(如foreach)还是同时处理(如Select)?如果一次一个,异步流不是更好的解决方案吗?如果同时,结果应该按原始项目顺序还是按完成顺序?它应该在第一次失败时失败还是等到所有都完成?等等。
@RogerWolf:是的;使用 SemaphoreSlim 限制异步任务。
J
JD Courtoy

这个小扩展方法应该为您提供异常安全的异步迭代:

public static async Task ForEachAsync<T>(this List<T> list, Func<T, Task> func)
{
    foreach (var value in list)
    {
        await func(value);
    }
}

由于我们将 lambda 的返回类型从 void 更改为 Task,因此异常将正确传播。这将允许您在实践中编写如下内容:

await db.Groups.ToList().ForEachAsync(async i => {
    await GetAdminsFromGroup(i.Gid);
});

除了等待 ForEachAsyn(),还可以调用 Wait()。
Lambda 不需要在这里等待。
我会在此处添加对 CancellationToken 的支持,如托德的回答stackoverflow.com/questions/29787098/…
出于好奇:您在示例中执行 foreach 与此有什么区别:someList.ForeEach(async item => { await SomeFunctionAsync(); }) 将我的代码更改为您的方法时,我的数据会从数据库中正确加载。当在这里使用我的方法时,列表中的数据不会显示在我的结果中。你知道为什么吗?
A
Andrei Krasutski

C# 8.0 开始,您可以异步创建和使用流。

    private async void button1_Click(object sender, EventArgs e)
    {
        IAsyncEnumerable<int> enumerable = GenerateSequence();

        await foreach (var i in enumerable)
        {
            Debug.WriteLine(i);
        }
    }

    public static async IAsyncEnumerable<int> GenerateSequence()
    {
        for (int i = 0; i < 20; i++)
        {
            await Task.Delay(100);
            yield return i;
        }
    }

More


这样做的好处是除了等待每个元素之外,您现在还等待枚举器的 MoveNext。这在枚举器无法立即获取下一个元素并且必须等待一个可用的情况下很重要。
这缺乏解释/示例如何将其应用于 OPs 问题。
R
RubberDuck

简单的答案是使用 foreach 关键字而不是 List()ForEach() 方法。

using (DataContext db = new DataLayer.DataContext())
{
    foreach(var i in db.Groups)
    {
        await GetAdminsFromGroup(i.Gid);
    }
}

为什么这个工作而不是使用 List()ForEach() 方法?
签名是 public void ForEach (Action<T> action) @taylorswiftfan。为了让它工作,它需要是 ForEach(Task<T> action)。由于我们没有通过普通的 ol' for 循环传递委托,因此我们可以等待循环内的每个调用。不过,现在有更好的方法。 stackoverflow.com/a/58112039/3198973
@RubberDuck 每当您这样说时,答案都会详细说明处理此问题的现代方式。那么您对 Entity Framework Core 或其他东西感兴趣吗?
@PeterCsala 我不知道你在说什么。我用这个赏金奖励现有的答案。这个。 stackoverflow.com/a/58112039/3198973
我实际上认为这是最简单的方法
m
mrogunlana

这是上述异步 foreach 变体的实际工作版本,具有顺序处理:

public static async Task ForEachAsync<T>(this List<T> enumerable, Action<T> action)
{
    foreach (var item in enumerable)
        await Task.Run(() => { action(item); }).ConfigureAwait(false);
}

这是实现:

public async void SequentialAsync()
{
    var list = new List<Action>();

    Action action1 = () => {
        //do stuff 1
    };

    Action action2 = () => {
        //do stuff 2
    };

    list.Add(action1);
    list.Add(action2);

    await list.ForEachAsync();
}

关键区别是什么? .ConfigureAwait(false); 在每个任务的异步顺序处理时保持主线程的上下文。


K
Kirk Woll

问题是 async 关键字需要出现在 lambda 之前,而不是在正文之前:

db.Groups.ToList().ForEach(async (i) => {
    await GetAdminsFromGroup(i.Gid);
});

这是对 async void 的不必要且微妙的使用。这种方法在异常处理和知道异步操作何时完成方面存在问题。
是的,我发现这不能正确处理异常。
这样,如果发生任何未处理的异常,它将破坏整个流程。
s
superlogical

添加此扩展方法

public static class ForEachAsyncExtension
{
    public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body)
    {
        return Task.WhenAll(from partition in Partitioner.Create(source).GetPartitions(dop) 
            select Task.Run(async delegate
            {
                using (partition)
                    while (partition.MoveNext())
                        await body(partition.Current).ConfigureAwait(false);
            }));
    }
}

然后像这样使用:

Task.Run(async () =>
{
    var s3 = new AmazonS3Client(Config.Instance.Aws.Credentials, Config.Instance.Aws.RegionEndpoint);
    var buckets = await s3.ListBucketsAsync();

    foreach (var s3Bucket in buckets.Buckets)
    {
        if (s3Bucket.BucketName.StartsWith("mybucket-"))
        {
            log.Information("Bucket => {BucketName}", s3Bucket.BucketName);

            ListObjectsResponse objects;
            try
            {
                objects = await s3.ListObjectsAsync(s3Bucket.BucketName);
            }
            catch
            {
                log.Error("Error getting objects. Bucket => {BucketName}", s3Bucket.BucketName);
                continue;
            }

            // ForEachAsync (4 is how many tasks you want to run in parallel)
            await objects.S3Objects.ForEachAsync(4, async s3Object =>
            {
                try
                {
                    log.Information("Bucket => {BucketName} => {Key}", s3Bucket.BucketName, s3Object.Key);
                    await s3.DeleteObjectAsync(s3Bucket.BucketName, s3Object.Key);
                }
                catch
                {
                    log.Error("Error deleting bucket {BucketName} object {Key}", s3Bucket.BucketName, s3Object.Key);
                }
            });

            try
            {
                await s3.DeleteBucketAsync(s3Bucket.BucketName);
            }
            catch
            {
                log.Error("Error deleting bucket {BucketName}", s3Bucket.BucketName);
            }
        }
    }
}).Wait();

E
ElConrado

如果您使用的是 EntityFramework.Core,则有 an extension method ForEachAsync

示例用法如下所示:

using Microsoft.EntityFrameworkCore;
using System.Threading.Tasks;

public class Example
{
    private readonly DbContext _dbContext;
    public Example(DbContext dbContext)
    {
        _dbContext = dbContext;
    }
    public async void LogicMethod()
    {
        
        await _dbContext.Set<dbTable>().ForEachAsync(async x =>
        {
            //logic
            await AsyncTask(x);
        });
    }

    public async Task<bool> AsyncTask(object x)
    {
        //other logic
        return await Task.FromResult<bool>(true);
    }
}

S
Shoter

这是我为使用 ForEach 处理异步场景而创建的方法。

如果其中一项任务失败,则其他任务将继续执行。

您可以添加将在每个异常上执行的函数。

异常在最后被收集为 aggregateException 并且可供您使用。

可以处理 CancellationToken

 public static class ParallelExecutor
    {
        /// <summary>
        /// Executes asynchronously given function on all elements of given enumerable with task count restriction.
        /// Executor will continue starting new tasks even if one of the tasks throws. If at least one of the tasks throwed exception then <see cref="AggregateException"/> is throwed at the end of the method run.
        /// </summary>
        /// <typeparam name="T">Type of elements in enumerable</typeparam>
        /// <param name="maxTaskCount">The maximum task count.</param>
        /// <param name="enumerable">The enumerable.</param>
        /// <param name="asyncFunc">asynchronous function that will be executed on every element of the enumerable. MUST be thread safe.</param>
        /// <param name="onException">Acton that will be executed on every exception that would be thrown by asyncFunc. CAN be thread unsafe.</param>
        /// <param name="cancellationToken">The cancellation token.</param>
        public static async Task ForEachAsync<T>(int maxTaskCount, IEnumerable<T> enumerable, Func<T, Task> asyncFunc, Action<Exception> onException = null, CancellationToken cancellationToken = default)
        {
            using var semaphore = new SemaphoreSlim(initialCount: maxTaskCount, maxCount: maxTaskCount);

            // This `lockObject` is used only in `catch { }` block.
            object lockObject = new object();
            var exceptions = new List<Exception>();
            var tasks = new Task[enumerable.Count()];
            int i = 0;

            try
            {
                foreach (var t in enumerable)
                {
                    await semaphore.WaitAsync(cancellationToken);
                    tasks[i++] = Task.Run(
                        async () =>
                        {
                            try
                            {
                                await asyncFunc(t);
                            }
                            catch (Exception e)
                            {
                                if (onException != null)
                                {
                                    lock (lockObject)
                                    {
                                        onException.Invoke(e);
                                    }
                                }

                                // This exception will be swallowed here but it will be collected at the end of ForEachAsync method in order to generate AggregateException.
                                throw;
                            }
                            finally
                            {
                                semaphore.Release();
                            }
                        }, cancellationToken);

                    if (cancellationToken.IsCancellationRequested)
                    {
                        break;
                    }
                }
            }
            catch (OperationCanceledException e)
            {
                exceptions.Add(e);
            }

            foreach (var t in tasks)
            {
                if (cancellationToken.IsCancellationRequested)
                {
                    break;
                }

                // Exception handling in this case is actually pretty fast.
                // https://gist.github.com/shoter/d943500eda37c7d99461ce3dace42141
                try
                {
                    await t;
                }
#pragma warning disable CA1031 // Do not catch general exception types - we want to throw that exception later as aggregate exception. Nothing wrong here.
                catch (Exception e)
#pragma warning restore CA1031 // Do not catch general exception types
                {
                    exceptions.Add(e);
                }
            }

            if (exceptions.Any())
            {
                throw new AggregateException(exceptions);
            }
        }
    }

您的实施存在一些问题。 1. 由于 Count(),源 IEnumerable<T> 被枚举了两次,这是一个禁忌。枚举可能会在每次枚举时访问数据库/文件系统/网络。 2. Task.Factory.StartNew(async 产生一个 Task<Task>,而您只等待外部 Task。该方法可能不会传播所有异常,并且可能在所有任务完成之前返回。并且 3. 使用 cancellationToken.IsCancellationRequested 可能会导致不一致的取消行为。请改用 ThrowIfCancellationRequested
关于 Action<Exception> onException = null 参数,它的用例是什么?是在错误发生时立即记录错误,而不是在 ForEachAsync 方法完成后批量记录它们吗?
你是对的,有例外。现在我使用 Task.Run,它不会吞下它们。
@TheodorZoulias 感谢您帮助我改进该设计。做得好!我真的很感激:)
谢谢四位见识。下次我要看看 TPL 库。谢谢指导! @TheodorZoulias
L
Luk164

我想补充一点,有一个内置 ForEach 函数的 Parallel class 可用于此目的。


Parallelis not async-friendly
@TheodorZoulias 我看到了。我认为它会达到类似的结果 - 尽可能快地使用多个线程来处理列表。我误解了预期的结果。
Parallel 类非常适合 CPU 密集型工作,但不适用于 I/O 密集型工作。例如,当您进行网络请求时的 you don't really need threads