让我们假设在每个时间点只有一个 Spark 作业正在运行。
到目前为止我得到了什么
以下是我对 Spark 中发生的事情的理解:
创建 SparkContext 时,每个工作节点都会启动一个执行程序。执行器是独立的进程 (JVM),连接回驱动程序。每个执行器都有驱动程序的 jar。退出驱动程序,关闭执行程序。每个执行者可以持有一些分区。执行作业时,会根据沿袭图创建执行计划。执行作业分为多个阶段,其中阶段包含尽可能多的相邻(在沿袭图中)转换和动作,但没有洗牌。因此,阶段被洗牌分开。
https://i.stack.imgur.com/NT1MB.png
我明白那个
任务是通过序列化 Function 对象从驱动程序发送到执行程序的命令。执行器反序列化(使用驱动程序 jar)命令(任务)并在分区上执行它。
但
问题)
如何将阶段拆分为这些任务?
具体来说:
任务是由转换和操作确定的,还是可以是多个转换/操作在一个任务中?任务是否由分区确定(例如,每个分区每个阶段一个任务)。任务是否由节点确定(例如,每个节点每个阶段一个任务)?
我的想法(即使是正确的,也只是部分答案)
在 https://0x0fff.com/spark-architecture-shuffle 中,用图像解释了 shuffle
https://i.stack.imgur.com/KxhJE.png
我得到的印象是规则是
每个阶段分为#number-of-partitions 个任务,不考虑节点数
对于我的第一张图片,我会说我有 3 个 map 任务和 3 个 reduce 任务。
对于来自 0x0fff 的图像,我会说有 8 个 map 任务和 3 个 reduce 任务(假设只有三个橙色和三个深绿色文件)。
任何情况下的开放性问题
那是对的吗?但即使这是正确的,我上面的问题也没有全部回答,因为它仍然是开放的,多个操作(例如多个地图)是在一个任务内还是每个操作被分成一个任务。
别人怎么说
What is a task in Spark? How does the Spark worker execute the jar file? 和 How does the Apache Spark scheduler split files into tasks? 相似,但我觉得我的问题没有得到明确的回答。
你在这里有一个很好的大纲。回答您的问题
确实需要为每个阶段的每个数据分区启动一个单独的任务。考虑到每个分区可能驻留在不同的物理位置——例如 HDFS 中的块或本地文件系统的目录/卷。
请注意,Stage
的提交是由 DAG Scheduler
驱动的。这意味着可以将不相互依赖的阶段提交到集群以并行执行:这最大限度地提高了集群的并行化能力。因此,如果我们的数据流中的操作可以同时发生,我们将期望看到多个阶段启动。
我们可以在下面的玩具示例中看到这一点,我们在其中执行以下类型的操作:
加载两个数据源
分别对两个数据源执行一些映射操作
加入他们
对结果执行一些映射和过滤操作
保存结果
那么我们最终会有多少阶段呢?
1 个阶段,每个阶段用于并行加载两个数据源 = 2 个阶段
第三个阶段表示依赖于其他两个阶段的连接
注意:所有对连接数据进行的后续操作可以在同一阶段执行,因为它们必须顺序发生。启动额外的阶段没有任何好处,因为在之前的操作完成之前它们无法开始工作。
这是那个玩具程序
val sfi = sc.textFile("/data/blah/input").map{ x => val xi = x.toInt; (xi,xi*xi) }
val sp = sc.parallelize{ (0 until 1000).map{ x => (x,x * x+1) }}
val spj = sfi.join(sp)
val sm = spj.mapPartitions{ iter => iter.map{ case (k,(v1,v2)) => (k, v1+v2) }}
val sf = sm.filter{ case (k,v) => v % 10 == 0 }
sf.saveAsTextFile("/data/blah/out")
这是结果的 DAG
https://i.stack.imgur.com/lhMp0.jpg
现在:有多少任务?任务数应等于
(Stage
* #Partitions in the stage
) 的总和
这可能会帮助您更好地理解不同的部分:
阶段:是任务的集合。针对不同的数据子集(分区)运行相同的进程。
任务:表示分布式数据集分区上的一个工作单元。因此,在每个阶段中,任务数 = 分区数,或者正如您所说的“每个阶段每个分区一个任务”。
每个执行器运行在一个纱线容器上,每个容器驻留在一个节点上。
每个阶段使用多个执行器,每个执行器分配有多个 vcore。
每个 vcore 一次只能执行一个任务
所以在任何阶段,多个任务都可以并行执行。正在运行的任务数 = 正在使用的 vcore 数。
如果我理解正确的话,有 2 件(相关的)事情会让你感到困惑:
1) 什么决定了任务的内容?
2)什么决定了要执行的任务数量?
Spark 的引擎将连续 rdd 上的简单操作“粘合”在一起,例如:
rdd1 = sc.textFile( ... )
rdd2 = rdd1.filter( ... )
rdd3 = rdd2.map( ... )
rdd3RowCount = rdd3.count
因此,当 rdd3 被(懒惰地)计算时,spark 将为 rdd1 的每个分区生成一个任务,每个任务将每行执行过滤器和映射以产生 rdd3。
任务的数量由分区的数量决定。每个 RDD 都有定义数量的分区。对于从 HDFS 读取的源 RDD(例如使用 sc.textFile(...)),分区数是输入格式生成的拆分数。 RDD(s) 上的某些操作可能会导致 RDD 具有不同数量的分区:
rdd2 = rdd1.repartition( 1000 ) will result in rdd2 having 1000 partitions ( regardless of how many partitions rdd1 had ).
另一个例子是连接:
rdd3 = rdd1.join( rdd2 , numPartitions = 1000 ) will result in rdd3 having 1000 partitions ( regardless of partitions number of rdd1 and rdd2 ).
(大多数)更改分区数量的操作涉及随机播放,例如,当我们这样做时:
rdd2 = rdd1.repartition( 1000 )
实际发生的情况是 rdd1 的每个分区上的任务需要产生一个可以被下一个阶段读取的最终输出,以便使 rdd2 恰好有 1000 个分区(他们是如何做到的?Hash 或 Sort)。这一侧的任务有时被称为“地图(侧)任务”。稍后将在 rdd2 上运行的任务将作用于一个分区(rdd2!),并且必须弄清楚如何读取/组合与该分区相关的映射端输出。这一侧的任务有时被称为“减少(侧)任务”。
这两个问题是相关的:一个阶段中的任务数是分区数(对于“粘合”在一起的连续 rdds 很常见),并且一个 rdd 的分区数可以在阶段之间改变(通过指定分区数到一些例如 shuffle 导致操作)。
一旦一个阶段的执行开始,它的任务就可以占用任务槽。并发任务槽的数量是 numExecutors * ExecutorCores。一般来说,这些可以被来自不同的、非依赖阶段的任务占用。
不定期副业成功案例分享
sum(..)
是为了考虑这种变化。