ChatGPT解决这个技术问题 Extra ChatGPT

Spark 中的阶段是如何划分为任务的?

让我们假设在每个时间点只有一个 Spark 作业正在运行。

到目前为止我得到了什么

以下是我对 Spark 中发生的事情的理解:

创建 SparkContext 时,每个工作节点都会启动一个执行程序。执行器是独立的进程 (JVM),连接回驱动程序。每个执行器都有驱动程序的 jar。退出驱动程序,关闭执行程序。每个执行者可以持有一些分区。执行作业时,会根据沿袭图创建执行计划。执行作业分为多个阶段,其中阶段包含尽可能多的相邻(在沿袭图中)转换和动作,但没有洗牌。因此,阶段被洗牌分开。

https://i.stack.imgur.com/NT1MB.png

我明白那个

任务是通过序列化 Function 对象从驱动程序发送到执行程序的命令。执行器反序列化(使用驱动程序 jar)命令(任务)并在分区上执行它。

问题)

如何将阶段拆分为这些任务?

具体来说:

任务是由转换和操作确定的,还是可以是多个转换/操作在一个任务中?任务是否由分区确定(例如,每个分区每个阶段一个任务)。任务是否由节点确定(例如,每个节点每个阶段一个任务)?

我的想法(即使是正确的,也只是部分答案)

https://0x0fff.com/spark-architecture-shuffle 中,用图像解释了 shuffle

https://i.stack.imgur.com/KxhJE.png

我得到的印象是规则是

每个阶段分为#number-of-partitions 个任务,不考虑节点数

对于我的第一张图片,我会说我有 3 个 map 任务和 3 个 reduce 任务。

对于来自 0x0fff 的图像,我会说有 8 个 map 任务和 3 个 reduce 任务(假设只有三个橙色和三个深绿色文件)。

任何情况下的开放性问题

那是对的吗?但即使这是正确的,我上面的问题也没有全部回答,因为它仍然是开放的,多个操作(例如多个地图)是在一个任务内还是每个操作被分成一个任务。

别人怎么说

What is a task in Spark? How does the Spark worker execute the jar file?How does the Apache Spark scheduler split files into tasks? 相似,但我觉得我的问题没有得到明确的回答。

如果您能添加更多见解,将不胜感激,我有类似的问题。
@Nag:我的问题也在寻找更多见解,这就是我问的原因:-)。答案是否提供了您在哪里寻找的东西?您需要什么样的见解?
啊,明白了。我想,既然这个问题发布得有点老,也许你会对你提出的问题有一些见解。想和你核对:-)
@Nag:嗯,自从我上次使用 Spark 以来已经有几年了,所以 a)如果我想知道 Spark 是如何工作的(我忘记了大部分细节),我必须再次阅读 Spark 和 b)我写的内容可能已经过时,尤其是因为我的帖子主要涉及 Spark 1.x,而且 Spark 2.x 发生了很多变化,afai 记得。但也许与后端架构无关的变化 - 这也可能是真的。
伟大的。谢谢 !!

W
WestCoastProjects

你在这里有一个很好的大纲。回答您的问题

确实需要为每个阶段的每个数据分区启动一个单独的任务。考虑到每个分区可能驻留在不同的物理位置——例如 HDFS 中的块或本地文件系统的目录/卷。

请注意,Stage 的提交是由 DAG Scheduler 驱动的。这意味着可以将不相互依赖的阶段提交到集群以并行执行:这最大限度地提高了集群的并行化能力。因此,如果我们的数据流中的操作可以同时发生,我们将期望看到多个阶段启动。

我们可以在下面的玩具示例中看到这一点,我们在其中执行以下类型的操作:

加载两个数据源

分别对两个数据源执行一些映射操作

加入他们

对结果执行一些映射和过滤操作

保存结果

那么我们最终会有多少阶段呢?

1 个阶段,每个阶段用于并行加载两个数据源 = 2 个阶段

第三个阶段表示依赖于其他两个阶段的连接

注意:所有对连接数据进行的后续操作可以在同一阶段执行,因为它们必须顺序发生。启动额外的阶段没有任何好处,因为在之前的操作完成之前它们无法开始工作。

这是那个玩具程序

val sfi  = sc.textFile("/data/blah/input").map{ x => val xi = x.toInt; (xi,xi*xi) }
val sp = sc.parallelize{ (0 until 1000).map{ x => (x,x * x+1) }}
val spj = sfi.join(sp)
val sm = spj.mapPartitions{ iter => iter.map{ case (k,(v1,v2)) => (k, v1+v2) }}
val sf = sm.filter{ case (k,v) => v % 10 == 0 }
sf.saveAsTextFile("/data/blah/out")

这是结果的 DAG

https://i.stack.imgur.com/lhMp0.jpg

现在:有多少任务?任务数应等于

(Stage * #Partitions in the stage) 的总和


谢谢!请详细说明您对我的文字的回答:1)我对阶段的定义不全面吗?听起来我错过了一个阶段不能包含可以并行的操作的要求。还是我的描述已经严格暗示了这一点? 2)作业必须执行的任务数量取决于分区的数量,而不是处理器或节点的数量,而可以同时执行的任务数量取决于分区的数量处理器,对吧? 3)一个任务可以包含多个操作?
4) 你最后一句话是什么意思?毕竟,分区的数量可能因阶段而异。您的意思是这就是您为所有阶段配置工作的方式吗?
@Make42 当然,分区的数量可能因阶段而异 - 你是对的。我说 sum(..) 是为了考虑这种变化。
哇,你的回答完全没问题,但不幸的是,最后一句话绝对是一个错误的概念。这并不意味着一个阶段中的分区数等于处理器数,但是,您可以根据机器上显示的内核数设置 RDD 的分区数。
@epcpu 这是一个特例-但我同意这会产生误导,因此我将其删除。
p
pedram bashiri

这可能会帮助您更好地理解不同的部分:

阶段:是任务的集合。针对不同的数据子集(分区)运行相同的进程。

任务:表示分布式数据集分区上的一个工作单元。因此,在每个阶段中,任务数 = 分区数,或者正如您所说的“每个阶段每个分区一个任务”。

每个执行器运行在一个纱线容器上,每个容器驻留在一个节点上。

每个阶段使用多个执行器,每个执行器分配有多个 vcore。

每个 vcore 一次只能执行一个任务

所以在任何阶段,多个任务都可以并行执行。正在运行的任务数 = 正在使用的 vcore 数。


这是一篇关于 Spark 架构的非常有用的读物:0x0fff.com/spark-architecture
我没有得到你的第 3 点。据我所知,每个节点可以有多个执行器,所以根据第 3 点:每个节点应该只有一个执行器。你能澄清这一点吗?
@RituparnoBehera 每个节点可以有多个容器,因此可以有多个 Spark 执行器。看看这个链接。 docs.cloudera.com/runtime/7.0.2/running-spark-applications/…
我认为执行程序不必只在纱线容器上运行
H
Harel Gliksman

如果我理解正确的话,有 2 件(相关的)事情会让你感到困惑:

1) 什么决定了任务的内容?

2)什么决定了要执行的任务数量?

Spark 的引擎将连续 rdd 上的简单操作“粘合”在一起,例如:

rdd1 = sc.textFile( ... )
rdd2 = rdd1.filter( ... )
rdd3 = rdd2.map( ... )
rdd3RowCount = rdd3.count

因此,当 rdd3 被(懒惰地)计算时,spark 将为 rdd1 的每个分区生成一个任务,每个任务将每行执行过滤器和映射以产生 rdd3。

任务的数量由分区的数量决定。每个 RDD 都有定义数量的分区。对于从 HDFS 读取的源 RDD(例如使用 sc.textFile(...)),分区数是输入格式生成的拆分数。 RDD(s) 上的某些操作可能会导致 RDD 具有不同数量的分区:

rdd2 = rdd1.repartition( 1000 ) will result in rdd2 having 1000 partitions ( regardless of how many partitions rdd1 had ).

另一个例子是连接:

rdd3 = rdd1.join( rdd2  , numPartitions = 1000 ) will result in rdd3 having 1000 partitions ( regardless of partitions number of rdd1 and rdd2 ).

(大多数)更改分区数量的操作涉及随机播放,例如,当我们这样做时:

rdd2 = rdd1.repartition( 1000 ) 

实际发生的情况是 rdd1 的每个分区上的任务需要产生一个可以被下一个阶段读取的最终输出,以便使 rdd2 恰好有 1000 个分区(他们是如何做到的?HashSort)。这一侧的任务有时被称为“地图(侧)任务”。稍后将在 rdd2 上运行的任务将作用于一个分区(rdd2!),并且必须弄清楚如何读取/组合与该分区相关的映射端输出。这一侧的任务有时被称为“减少(侧)任务”。

这两个问题是相关的:一个阶段中的任务数是分区数(对于“粘合”在一起的连续 rdds 很常见),并且一个 rdd 的分区数可以在阶段之间改变(通过指定分区数到一些例如 shuffle 导致操作)。

一旦一个阶段的执行开始,它的任务就可以占用任务槽。并发任务槽的数量是 numExecutors * ExecutorCores。一般来说,这些可以被来自不同的、非依赖阶段的任务占用。


关注公众号,不定期副业成功案例分享
关注公众号

不定期副业成功案例分享

领先一步获取最新的外包任务吗?

立即订阅