ChatGPT解决这个技术问题 Extra ChatGPT

How are stages split into tasks in Spark?

Let's assume for the following that only one Spark job is running at every point in time.

What I get so far

Here is what I understand what happens in Spark:

When a SparkContext is created, each worker node starts an executor. Executors are separate processes (JVM), that connects back to the driver program. Each executor has the jar of the driver program. Quitting a driver, shuts down the executors. Each executor can hold some partitions. When a job is executed, an execution plan is created according to the lineage graph. The execution job is split into stages, where stages containing as many neighbouring (in the lineage graph) transformations and action, but no shuffles. Thus stages are separated by shuffles.

https://i.stack.imgur.com/NT1MB.png

I understand that

A task is a command sent from the driver to an executor by serializing the Function object. The executor deserializes (with the driver jar) the command (task) and executes it on a partition.

but

Question(s)

How do I split the stage into those tasks?

Specifically:

Are the tasks determined by the transformations and actions or can be multiple transformations/actions be in a task? Are the tasks determined by the partition (e.g. one task per per stage per partition). Are the tasks determined by the nodes (e.g. one task per stage per node)?

What I think (only partial answer, even if right)

In https://0x0fff.com/spark-architecture-shuffle, the shuffle is explained with the image

https://i.stack.imgur.com/KxhJE.png

and I get the impression that the rule is

each stage is split into #number-of-partitions tasks, with no regard for the number of nodes

For my first image I'd say that I'd have 3 map tasks and 3 reduce tasks.

For the image from 0x0fff, I'd say there are 8 map tasks and 3 reduce tasks (assuming that there are only three orange and three dark green files).

Open questions in any case

Is that correct? But even if that is correct, my questions above are not all answered, because it is still open, whether multiple operations (e.g. multiple maps) are within one task or are separated into one tasks per operation.

What others say

What is a task in Spark? How does the Spark worker execute the jar file? and How does the Apache Spark scheduler split files into tasks? are similar, but I did not feel that my question was answered clearly there.

Would appreciate if you could add more insights , I had similar questions.
@Nag: My question was also looking for more insights, that is why I asked :-). Did the answers provide what you where looking for? What kind of insights are you asking for?
ah, got it. I thought since this question posted little old and perhaps you would have got some insights regarding the questions you asked. thought of checking with you :-)
@Nag: Well, it has been a couple of years, since I last worked with Spark, so a) I would have to read into Spark again if I wanted to know how it works (I forgot most of the details) and b) what I wrote might be outdated, especially, because my post refered mostly to Spark 1.x and there has been a lot of change to Spark 2.x, afai remember. But maybe the changes where not regarding the backend archticture - that might also be true.
great. Thanks !!

W
WestCoastProjects

You have a pretty nice outline here. To answer your questions

A separate task does need to be launched for each partition of data for each stage. Consider that each partition will likely reside on distinct physical locations - e.g. blocks in HDFS or directories/volumes for a local file system.

Note that the submission of Stages is driven by the DAG Scheduler. This means that stages that are not interdependent may be submitted to the cluster for execution in parallel: this maximizes the parallelization capability on the cluster. So if operations in our dataflow can happen simultaneously we will expect to see multiple stages launched.

We can see that in action in the following toy example in which we do the following types of operations:

load two datasources

perform some map operation on both of the data sources separately

join them

perform some map and filter operations on the result

save the result

So then how many stages will we end up with?

1 stage each for loading the two datasources in parallel = 2 stages

A third stage representing the join that is dependent on the other two stages

Note: all of the follow-on operations working on the joined data may be performed in the same stage because they must happen sequentially. There is no benefit to launching additional stages because they can not start work until the prior operation were completed.

Here is that toy program

val sfi  = sc.textFile("/data/blah/input").map{ x => val xi = x.toInt; (xi,xi*xi) }
val sp = sc.parallelize{ (0 until 1000).map{ x => (x,x * x+1) }}
val spj = sfi.join(sp)
val sm = spj.mapPartitions{ iter => iter.map{ case (k,(v1,v2)) => (k, v1+v2) }}
val sf = sm.filter{ case (k,v) => v % 10 == 0 }
sf.saveAsTextFile("/data/blah/out")

And here is the DAG of the result

https://i.stack.imgur.com/lhMp0.jpg

Now: how many tasks ? The number of tasks should be equal to

Sum of (Stage * #Partitions in the stage)


Thanks! Please elaborate your answer in regard to my text: 1) Is my definition of stages not-comprehensive? It sounds that I missed the requirement that a stage cannot contain operations that could be in-parallel. Or is my description strictly implying that already? 2) The number of tasks that have to be executed for the job is determined by the number of partitions, but not the number of processors or nodes, while the number of tasks that can be executed at the same time is depended on the number of processors, right? 3) A task can contain multiple operations?
4) What did you mean with your last sentence? After all, the number partitions can vary from stage to stage. Did you mean that this is how you configured your job for all stages?
@Make42 Of course the number of partitions can vary from stage to stage - you are correct. It was my intention by saying sum(..) to take into account that variation.
wow, your answer was totally ok but unfortunately, the last sentence is definitely a wrong concept. It does not mean partition numbers in a stage is equal to the number of processors, however, you can set the number of partitions for a RDD according to the number of cores presented on your machine.
@epcpu It was a special case - but I agree that would be misleading so I am removing it.
p
pedram bashiri

This might help you better understand different pieces:

Stage: is a collection of tasks. Same process running against different subsets of data (partitions).

Task: represents a unit of work on a partition of a distributed dataset. So in each stage, number-of-tasks = number-of-partitions, or as you said "one task per stage per partition”.

Each executer runs on one yarn container, and each container resides on one node.

Each stage utilizes multiple executers, each executer is allocated multiple vcores.

Each vcore can execute exactly one task at a time

So at any stage, multiple tasks could be executed in parallel. number-of-tasks running = number-of-vcores being used.


This is a really useful read on spark architecture: 0x0fff.com/spark-architecture
I didn't get your point number 3. As far as I know each node can have multiple executors, so according to point 3: There should be only one executor per node. Can you clarify this point ?
@RituparnoBehera each node can have mutiple containers and thus multiple Spark executors. Check out this link. docs.cloudera.com/runtime/7.0.2/running-spark-applications/…
I don't think executors have to run on just yarn containers
H
Harel Gliksman

If I understand correctly there are 2 ( related ) things that confuse you:

1) What determines the content of a task?

2) What determines the number of tasks to be executed?

Spark's engine "glues" together simple operations on consecutive rdds, for example:

rdd1 = sc.textFile( ... )
rdd2 = rdd1.filter( ... )
rdd3 = rdd2.map( ... )
rdd3RowCount = rdd3.count

so when rdd3 is (lazily) computed, spark will generate a task per partition of rdd1 and each task will execute both the filter and the map per line to result in rdd3.

The number of tasks is determined by the number of partitions. Every RDD has a defined number of partitions. For a source RDD that is read from HDFS ( using sc.textFile( ... ) for example ) the number of partitions is the number of splits generated by the input format. Some operations on RDD(s) can result in an RDD with a different number of partitions:

rdd2 = rdd1.repartition( 1000 ) will result in rdd2 having 1000 partitions ( regardless of how many partitions rdd1 had ).

Another example is joins:

rdd3 = rdd1.join( rdd2  , numPartitions = 1000 ) will result in rdd3 having 1000 partitions ( regardless of partitions number of rdd1 and rdd2 ).

( Most ) operations that change the number of partitions involve a shuffle, When we do for example:

rdd2 = rdd1.repartition( 1000 ) 

what actually happens is the task on each partition of rdd1 needs to produce an end-output that can be read by the following stage so to make rdd2 have exactly 1000 partitions ( How they do it? Hash or Sort ). Tasks on this side are sometimes referred to as "Map ( side ) tasks". A task that will later run on rdd2 will act on one partition ( of rdd2! ) and would have to figure out how to read/combine the map-side outputs relevant to that partition. Tasks on this side are sometimes referred to as "Reduce ( side ) tasks".

The 2 questions are related: the number of tasks in a stage is the number of partitions ( common to the consecutive rdds "glued" together ) and the number of partitions of an rdd can change between stages ( by specifying the number of partitions to some shuffle causing operation for example ).

Once the execution of a stage commences, its tasks can occupy task slots. The number of concurrent task-slots is numExecutors * ExecutorCores. In general, these can be occupied by tasks from different, non-dependent stages.


关注公众号,不定期副业成功案例分享
Follow WeChat

Success story sharing

Want to stay one step ahead of the latest teleworks?

Subscribe Now