ChatGPT解决这个技术问题 Extra ChatGPT

Apache Spark:核心数与执行程序数

我试图了解在 YARN 上运行 Spark 作业时核心数和执行程序数之间的关系。

测试环境如下:

数据节点数:3

数据节点机器规格: CPU:Core i7-4790(核心数:4,线程数:8) RAM:32GB (8GB x 4) HDD:8TB (2TB x 4)

CPU:Core i7-4790(核心数:4,线程数:8)

内存:32GB(8GB x 4)

硬盘:8TB (2TB x 4)

网络:1Gb

火花版本:1.0.0

Hadoop 版本:2.4.0(Hortonworks HDP 2.1)

Spark 作业流程:sc.textFile -> filter -> map -> filter -> mapToPair -> reduceByKey -> map -> saveAsTextFile

输入数据类型:单个文本文件大小:165GB 行数:454,568,833

类型:单个文本文件

大小:165GB

行数:454,568,833

输出第二个过滤器后的行数:310,640,717 结果文件的行数:99,848,268 结果文件的大小:41GB

第二次过滤后的行数:310,640,717

结果文件行数:99,848,268

结果文件大小:41GB

该作业使用以下配置运行:

--master yarn-client --executor-memory 19G --executor-cores 7 --num-executors 3(每个数据节点的executors,使用与cores一样多) --master yarn-client --executor-memory 19G -- executor-cores 4 --num-executors 3(减少核心数量)--master yarn-client --executor-memory 4G --executor-cores 2 --num-executors 12(更少核心,更多执行者)

经过时间:

50 分 15 秒 55 分 48 秒 31 分 23 秒

令我惊讶的是,(3)要快得多。我认为(1)会更快,因为洗牌时执行者之间的沟通会更少。尽管 (1) 的核心数少于 (3),但核心数不是关键因素,因为 2) 确实表现良好。

(在 pwilmot 的回答之后添加了以下内容。)

有关信息,性能监视器屏幕截图如下:

(1) 的神经节数据节点摘要 - 作业于 04:37 开始。

https://i.stack.imgur.com/bVnyJ.png

(3) 的神经节数据节点摘要 - 作业于 19:47 开始。请忽略该时间之前的图表。

https://i.stack.imgur.com/2K2Kp.png

该图大致分为两部分:

第一:从开始到reduceByKey:CPU密集,没有网络活动

第二:reduceByKey: CPU 降低后,网络 I/O 完成。

如图所示,(1) 可以使用尽可能多的 CPU 功率。所以,可能不是线程数的问题。

如何解释这个结果?

现在我怀疑 GC ......事实上,在 Spark UI 上,GC 花费的总时间在 1) 上比 2) 上更长。
您为什么不尝试 3) 使用 19G?是否将工人限制在 4G 上会降低某些人的 NUMA 效应?即,您的 4G 位于分配给您的工作流程的 2 个核心之一上,因此 i/o 减速更少,从而带来更好的整体性能。否则我认为一个主要问题是:有多少核心/线程可以在一个工作人员上使用一个执行程序? (只能指定一个worker的总核数,不能指定executor的粒度)
顺便说一句,我刚刚检查了 core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala 的代码,似乎 1 executor = 1 worker 线程。
有点晚了,但这里有一篇关于这个主题的关于 cloudera 的帖子:blog.cloudera.com/blog/2015/03/…
顺便说一句,我在 cloudera 幻灯片 slideshare.net/cloudera/… 中找到了这些信息,它解释了一些关于执行器、核心和内存的决策

C
Community

为了希望使这一切更加具体,这里有一个配置 Spark 应用程序以使用尽可能多的集群的工作示例:想象一个集群有六个运行 NodeManagers 的节点,每个节点配备 16 个内核和 64GB 内存。 NodeManager 容量,yarn.nodemanager.resource.memory-mb 和 yarn.nodemanager.resource.cpu-vcores,应该分别设置为 63 * 1024 = 64512(兆字节)和 15。我们避免将 100% 的资源分配给 YARN 容器,因为节点需要一些资源来运行操作系统和 Hadoop 守护进程。在这种情况下,我们为这些系统进程留下一个千兆字节和一个内核。 Cloudera Manager 通过考虑这些并自动配置这些 YARN 属性来提供帮助。可能的第一个冲动是使用 --num-executors 6 --executor-cores 15 --executor-memory 63G。但是,这是错误的方法,因为:63GB + 执行程序内存开销不适合 NodeManagers 的 63GB 容量。应用程序主控将在其中一个节点上占用一个核心,这意味着该节点上将没有空间容纳 15 核执行器。每个执行器 15 个内核会导致 HDFS I/O 吞吐量不佳。更好的选择是使用 --num-executors 17 --executor-cores 5 --executor-memory 19G。为什么?此配置会在所有节点上产生三个执行程序,但带有 AM 的节点除外,它将有两个执行程序。 --executor-memory 派生为(每个节点 63/3 个执行器)= 21. 21 * 0.07 = 1.47。 21 – 1.47 ~ 19。

Cloudera 的博客 How-to: Tune Your Apache Spark Jobs (Part 2) 中的一篇文章中给出了解释。


“这个配置会在所有节点上产生三个执行器,除了带有 AM 的那个,它将有两个执行器。”。这对“--executor-cores 5”意味着什么?
这意味着每个执行器使用 5 个核心。每个节点有 3 个执行器,因此使用 15 个核心,除了其中一个节点还将为作业运行应用程序主控器,因此只能托管 2 个执行器,即 10 个核心用作执行器。
很好解释 - 请注意,这适用于 yarn.scheduler.capacity.resource-calculator 禁用,这是默认设置。这是因为默认情况下它按内存而不是 CPU 进行调度。
更多的执行器会导致 HDFS I/O 吞吐量不佳。因此,如果我根本不使用 HDFS,在这种情况下,每个执行程序可以使用超过 5 个内核吗?
我虽然应用程序主机在每个节点上运行。如上所述,这意味着只有 1 个 Application Master 来运行该作业。那是对的吗?
t
turtlemonvh

简短回答:我认为 tgbaggio 是对的。您达到了执行程序的 HDFS 吞吐量限制。

我认为这里的答案可能比这里的一些建议要简单一些。

我的线索在集群网络图中。对于运行 1,利用率稳定在 ~50 M 字节/秒。对于运行 3,稳定利用率翻了一番,约为 100 M 字节/秒。

DzOrd 分享的 the cloudera blog post 中,您可以看到以下重要引述:

我注意到 HDFS 客户端在处理大量并发线程时遇到了问题。一个粗略的猜测是,每个 executor 最多五个任务可以实现完整的写入吞吐量,因此最好将每个 executor 的核心数保持在该数字以下。

所以,让我们做一些计算,看看如果这是真的,我们期望什么性能。

运行 1:19 GB,7 个内核,3 个执行程序

个执行器 x 7 个线程 = 21 个线程

每个执行程序有 7 个内核,我们预计 HDFS 的 IO 有限(最多约 5 个内核)

有效吞吐量 ~= 3 执行器 x 5 线程 = 15 线程

运行 3:4 GB,2 个内核,12 个执行器

个执行器 x 12 个线程 = 24 个线程

每个执行器 2 个核心,因此 hdfs 吞吐量还可以

有效吞吐量 ~= 12 执行器 x 2 线程 = 24 线程

如果作业 100% 受并发(线程数)限制。我们希望运行时与线程数完全成反比。

ratio_num_threads = nthread_job1 / nthread_job3 = 15/24 = 0.625
inv_ratio_runtime = 1/(duration_job1 / duration_job3) = 1/(50/31) = 31/50 = 0.62

所以 ratio_num_threads ~= inv_ratio_runtime,看起来我们是网络受限的。

同样的效果解释了 Run 1 和 Run 2 之间的区别。

运行 2:19 GB,4 个内核,3 个执行程序

个执行器 x 4 个线程 = 12 个线程

每个执行器有 4 个内核,可以 IO 到 HDFS

有效吞吐量 ~= 3 执行器 x 4 线程 = 12 线程

比较有效线程数和运行时:

ratio_num_threads = nthread_job2 / nthread_job1 = 12/15 = 0.8
inv_ratio_runtime = 1/(duration_job2 / duration_job1) = 1/(55/50) = 50/55 = 0.91

它不像上次比较那样完美,但是当我们失去线程时,我们仍然看到类似的性能下降。

现在是最后一点:为什么我们会通过更多线程获得更好的性能,尤其是。线程数多于 CPU 数?

Rob Pike 的这篇精彩文章很好地解释了并行性(我们通过将数据划分到多个 CPU 上得到的)和并发性(当我们使用多个线程在单个 CPU 上工作时得到的)之间的区别:{ 1}。

简短的解释是,如果 Spark 作业与文件系统或网络交互,CPU 会花费大量时间等待与这些接口的通信,而不是花费大量时间实际“工作”。通过让这些 CPU 一次处理超过 1 个任务,它们等待的时间更少,工作的时间更多,你会看到更好的性能。


有趣且令人信服的解释,我想知道您是如何猜测执行程序有 5 个任务限制以实现最大吞吐量的。
所以数字 5 不是我想出来的:我只是注意到 IO 瓶颈的迹象,然后开始寻找这些瓶颈可能来自哪里。
t
tgbaggio

根据 Sandy Ryza,当您在 HDFS 上运行 Spark 应用程序时

我注意到 HDFS 客户端在处理大量并发线程时遇到了问题。一个粗略的猜测是,每个 executor 最多五个任务可以实现完整的写入吞吐量,因此最好将每个 executor 的核心数保持在该数字以下。

所以我相信你的第一个配置比第三个配置慢是因为 HDFS I/O 吞吐量不好


p
pwilmot

我自己没有玩过这些设置,所以这只是推测,但如果我们将此问题视为分布式系统中的普通内核和线程,那么在您的集群中,您最多可以使用 12 个内核(4 * 3 台机器)和 24 个线程(8 * 3 台机器)。在您的前两个示例中,您为您的工作提供了相当数量的内核(潜在的计算空间),但是在这些内核上运行的线程(工作)数量非常有限,以至于您无法使用分配的大部分处理能力因此即使分配了更多的计算资源,作业也会变慢。

您提到您关心的是洗牌步骤 - 虽然限制洗牌步骤中的开销很好,但利用集群的并行化通常更为重要。考虑一下极端情况——零随机播放的单线程程序。


谢谢你的回答。但我怀疑线程数不是主要问题。我添加了监控屏幕截图。如图所示,1) 可以使用尽可能多的 CPU 功率。
@zeodtr pwilmot 是正确的 - 您至少需要 2-4 个任务才能充分利用核心的潜力。把它说成是 - 我通常为我的 80 个核心集群使用至少 1000 个分区。
@samthebest我想知道的是1)和3)之间性能差异的原因。当我观看 Spark UI 时,在第 2 部分中都并行运行了 21 个任务。(为什么在 3 的情况下是 21 而不是 24)目前尚不清楚)但是,3)的任务运行得更快。
C
Community

RStudio's Sparklyr package page 提供的优秀资源:

SPARK 定义:为 Spark 命名法提供一些简单的定义可能会很有用: 节点:服务器 工作节点:作为集群一部分的服务器,可用于运行 Spark 作业 主节点:协调工作节点的服务器。 Executor:节点内的一种虚拟机。一个节点可以有多个执行者。驱动节点:启动 Spark 会话的节点。通常,这将是 sparklyr 所在的服务器。 Driver(Executor):Driver Node也会出现在Executor列表中。


z
zwb

我认为主要原因之一是地方性。你的输入文件大小是165G,文件的相关块肯定分布在多个DataNode上,更多的executor可以避免网络复制。

尝试设置 executor num 等于块数,我认为可以更快。


H
Harikrishnan Ck

Spark 动态分配提供了灵活性并动态分配资源。在这个数量的 min 和 max executors 中可以给出。此外,还可以给出在应用程序启动时必须启动的执行程序的数量。

请阅读以下内容:

http://spark.apache.org/docs/latest/configuration.html#dynamic-allocation


l
loneStar

我认为前两种配置存在一个小问题。线程和核心的概念如下。线程的概念是,如果核心是理想的,则使用该核心来处理数据。所以在前两种情况下内存没有被充分利用。如果要对这个示例进行基准测试,请选择每台机器上具有 10 个以上内核的机器。然后做基准测试。

但是不要为每个执行程序提供超过 5 个内核,否则 i/o 性能会出现瓶颈。

因此,进行此基准测试的最佳机器可能是具有 10 个内核的数据节点。

数据节点机器规格: CPU:Core i7-4790(核心数:10,线程数:20) RAM:32GB (8GB x 4) HDD:8TB (2TB x 4)