Let's assume for the following that only one Spark job is running at every point in time.
What I get so far
Here is what I understand what happens in Spark:
- When a
SparkContext
is created, each worker node starts an executor.
Executors are separate processes (JVM), that connects back to the driver program. Each executor has the jar of the driver program. Quitting a driver, shuts down the executors. Each executor can hold some partitions. - When a job is executed, an execution plan is created according to the lineage graph.
- The execution job is split into stages, where stages containing as many neighbouring (in the lineage graph) transformations and action, but no shuffles. Thus stages are separated by shuffles.
I understand that
- A task is a command sent from the driver to an executor by serializing the Function object.
- The executor deserializes (with the driver jar) the command (task) and executes it on a partition.
but
Question(s)
How do I split the stage into those tasks?
Specifically:
- Are the tasks determined by the transformations and actions or can be multiple transformations/actions be in a task?
- Are the tasks determined by the partition (e.g. one task per per stage per partition).
- Are the tasks determined by the nodes (e.g. one task per stage per node)?
What I think (only partial answer, even if right)
In https://0x0fff.com/spark-architecture-shuffle, the shuffle is explained with the image
and I get the impression that the rule is
each stage is split into #number-of-partitions tasks, with no regard for the number of nodes
For my first image I'd say that I'd have 3 map tasks and 3 reduce tasks.
For the image from 0x0fff, I'd say there are 8 map tasks and 3 reduce tasks (assuming that there are only three orange and three dark green files).
Open questions in any case
Is that correct? But even if that is correct, my questions above are not all answered, because it is still open, whether multiple operations (e.g. multiple maps) are within one task or are separated into one tasks per operation.
What others say
What is a task in Spark? How does the Spark worker execute the jar file? and How does the Apache Spark scheduler split files into tasks? are similar, but I did not feel that my question was answered clearly there.
Best Answer
You have a pretty nice outline here. To answer your questions
task
does need to be launched for each partition of data for eachstage
. Consider that each partition will likely reside on distinct physical locations - e.g. blocks in HDFS or directories/volumes for a local file system.Note that the submission of
Stage
s is driven by theDAG Scheduler
. This means that stages that are not interdependent may be submitted to the cluster for execution in parallel: this maximizes the parallelization capability on the cluster. So if operations in our dataflow can happen simultaneously we will expect to see multiple stages launched.We can see that in action in the following toy example in which we do the following types of operations:
So then how many stages will we end up with?
join
that is dependent on the other two stagesHere is that toy program
And here is the DAG of the result
Now: how many tasks ? The number of tasks should be equal to
Sum of (
Stage
*#Partitions in the stage
)