Spark Optimize Part 1: Spark fundamentals

Spark Optimize Part 1: Spark fundamentals

To understand how to optimize Spark, we need to understand how Spark works first. Let's start from fundamentals.

RDD

RDD stands for Resilient Distributed Dataset. It is the abstraction of data collections, describing distributed data in both memory and disk. It has four properties: partitions, partitioner, dependencies and compute.

Partitions and partitioner

Partitioner defines how to group different sets of data and partitions are the places to put collections of data that have the same properties. Based on these two properties, spark can distribute computations across different notes in a cluster, making it scalable.

Dependencies and compute

In Spark, we are not 'create' the RDDs, but just transform one RDD(or datasource) to another. Dependencies have the source of upstream RDD, or lineage, while compute stores the information about how to convert parent RDDs to curernt one. These enables fault tolerance of Spark. It is also the how 'Resilient' of RDD comes from.

DAG & Stage

DAG stands for Direct Acyclic Graph. It consists of vertexes and edges. In Spark, vertexes represent RDDs and edges represent dependencies. DAG describes the whole cumpute job to be processed.

Stage: based on DAG, spark will divide chunks of DAG into stages. Spark will trace back the operations from action, each time if there is a shuffle, it will be divided into one stage, till the beginning of the DAG. In each stage, since there is no shuffle, Spark put all operations in the stage to one big function so that it avoids generating unneceessary intermediate results and save them to disk. So it runs faster than Hadoop map reduce.

Task

The smallest unit of work that can be schduled. It is luanched by executors represents a single operation that can be applied independently to a partition of a RDD. In other words, it is the copy of stage in single node.

Schedulers

When Spark run jobs, it need to divide the jobs and distribute these sub jobs(tasks) to executors. So, it need to know how to divide (We've mentioned above) and hardware info to distribute tasks to right place. The whole process showes below

  1. Divide DAG into stages
  2. Create distributed tasks and tasksets
  3. Get information about avaliable hardware reousrces in cluster
  4. Set priority for task/tasket
  5. Distribute tasks to executors in order

The schedulers of Spark play critical roles here.

DAGScheduler

Transforms DAG to stages, then create tasks/tasksets from stage. It works solely on the driver and is created as part of SparkContext's initialization (right after TaskScheduler and SchedulerBackend are ready).

SchedulerBackend

It manages hardware resources in cluster. It is an abstact class. Spark will choose the right implementation of SchedulerBackend by MasterURL user provided.

SchedulerBackend uses data structure called ExecutorDataMap(HashMap[String, ExecutorData]) to store the status of each executor. ExecutorData contains info about Executor's RPC Endpoint, RpcAddress, Host, Free Cores, Total Cores, Log URLs(Map[String, String]), Attributes (Map[String, String]), Resources Info (Map[String, ExecutorResourceInfo]), ResourceProfile ID.

ExecutorDataMap strcuture

SchedulerBackend provides coumputing resources as WorkerOffer. WorkOffer encapslates executor's id, host and free cores.

TaskScheduler

Send certain tasks to certain executors by rules.

It sets the different priorities for stages and tasks inside stage. For stage, it as two rules, FIFO(First In First Out) and FAIR. FIFO keeps all stages run sequentially while FAIR allows user to set priority and run stages by priority. For tasks, while TaskScheduler receives WorkOffer from SchedulerBackend, it will set priority based on Locality-aware Scheduling: process local < node local < rack local < any. DAGScheduler also involes here: When stages and tasks are created, it will designate the executor host, id even process id it preforred. TaskScheduler takes these into consideration.

In conclusion, TaskScheduler tend to send the task code to where the data of the task located. It is cheaper than send data to the job executor on IO cost, which imporves the speed.

Spark Storage System

It serves for three parts: RDD cache, shuffle intermediate files and broadcast values. It can speed up acess speed and reduce load of recomputation when jon fails (The intermediate results already there). It can utilize both memory and harddisk. RDD cache uses both, shuffle only uses hard disk and broadcast values only use memory.

BlockManager

The most fundamental compment.

It manages the storage for blocks (chunks of data) that can be stored in memory and on disk.

It also communicate with other nodes / master. It reports its meta data to master and sync global meta data from master by communicate with BlockManagerMaster periodically, it also pull and push data directly with other nodes.

MemoryStore

Located in executor. It supports to store object values and byte array. Byte array are serialized data of object values. They are stored as MemoryEntry, DeserializedMemoryEntry for object values and SerializedMemoryEntry for byte array. Each partition maps to an entry in form of LinkedList<BlockId,MemoryEntry>.

DiskStore

Located in executor. Only supports byte array. It use DiskBlockManager to maintain the mapping between data block and physical file. Every file has its own block id. These files are saved in spark.local.dir.

Memory Management

Spark devides memory in two parts: On-heap memory and Off-heap memory. By default, Spark only uses On-heap memory.

On-heap memory

If use Yarn as resource manager, it will first get 384MB or 10% of executor memory size whichever is larger as revserved. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. It is controlled by spark.yarn.executor.memoryOverhead

The real usable on-heap space is controlled by spark.executor.memory. It is divided by four parts

  • Reserved memory: mainly used to store Spark cache data, such as RDD cache, Unroll data, and so on. It is not controlled by user.
  • Execution Memory: mainly used to store temporary data in the calculation process of Shuffle, Join, Sort, Aggregation, etc. After Spark 1.6, it can be dynamically allocated, which mean when there is free space in executor, it can use this free space. It also has higher priority than storage memory. In other words, it shares the executor memory space with storage memory, when needed, it can take spaces from storage memory. If there is any conflict, it can finish its job and release the extra space. It is controlled by 1 * spark.memory.fraction * ( 1 - spark.memory.storageFraction).
  • Storage Memory: mainly used to store the data needed for RDD conversion operations, such as the information for RDD dependency. It can also take space from execution memory, but when execurion memory need space, it will release immediately no matter the job is finished or not. It is controlled by spark.memory.fraction * spark.memory.storageFraction
  • User Memory: mainly used to store the data needed for RDD conversion operations, such as the information for RDD dependency. It is controlled by 1 - spark.memory.fraction

Off-heap memory

It is memory out of JVM and managed by Spark itself. It only has Storage Memory and Execution Memory. By default, it is disabled. Use spark.memory.offHeap.enabled to enable it. The size of memory is controlled by spark.memory.offHeap.size

conclusion

We've already know the concepts and the ways of how spark work. Based on these, we've already got some ideas the potential bottle-necks of spark jobs. We will discuss how to do the optimization in part 2 of the series. Stay tuned!