A Systematic approach, tips, techniques, and best practices to enhance Apache Spark job performance
Follow a systematic approach to optimize spark job performance instead of randomly tuning different parameters and wasting an endless amount of time.
A 5 step approach for tuning performance from quality assurance:
- Measure and capture the performance metrics before optimization. (Use Spark UI)
- Identify the highest bottleneck that can improve the performance (Use Spark UI to identify bottleneck is it CPU, Memory, or IO)
- Change or fix the issue one at a time, don’t make more than one fix at a time.
- Measure after fixing the identified bottleneck
- If the performance improved commit the fix, else go step 2 and repeat the process
3 Basic components of distributed computing architecture CPU, Memory, and I/O (Disk, Network Bandwidth):
It’s easy to write a Spark program, but it’s not trivial to write a Spark code that performs well in a given context. The same applies to modern distributed processing engines. Modern distributed computing architectures at the fundamental level operate similarly and knowing the basic building blocks helps tune parallel processing computing engines.
Optimization of the massively parallel processing engines depends on the following three properties:
CPU, Memory, and I/O (Disk, Network Bandwidth)
Apache Spark Core Components
Spark API follows the Leader/Follower architecture pattern. A Spark application consists of a driver program that coordinates with internal or external resource managers and schedules jobs on the followers.
Three important concepts to understand in Spark at a high level
- RDD is a distributed collection of data, created from input sources.
- Transformations are Spark API Commands that transform one RDD/dataset into another RDD/dataset (ex: map, flatMap, filter).
- Actions trigger computation or execution to calculate or output data (e.g. collect, take, first).
Apache Spark Execution Model:
Spark driver divides application code into a sequence of jobs, instructs workers to execute the jobs on an RDD partition.
- Spark Application is a user program developed using Spark API’s.
- Spark application executed inside a program called driver ( The driver has the equivalent of the ‘main’ function in other programming languages).
- The driver creates a set of jobs based on the application.
- Each action (ex. collect, first, take) triggers a job.
- Jobs are run in sequence by default from the above diagram Job-2 is executed after Job-1 is completed.
- It is possible to have Job-1 and Job-2 run in parallel it is complex and not used in general, but know that it is possible to run two jobs in parallel.
- Each job is divided into multiple stages and each stage is dependent on the previous stage. Stages are run in sequence and not parallel.
- Shuffle operation (e.g. reduce, repartition) requires the creation of a new stage and a new boundary.
- Stages are split into multiple tasks.
- Tasks are the smallest unit of work that run in parallel.
- A single task uses a core/slot/thread to process a partition.
- In spark core/slot/thread means a single unit of work/execution (Ignore the CPU physical hardware core terminology).
- Similar kinds of tasks are run in parallel across all of the nodes.
Note:
It is common to have multi-cores and multi-threads in modern-day CPUs. In the context of Spark core/slot/thread means the same. When it comes to physical hardware 4 CPUs each with 2 cores, and 2 threads per core will have a total of 16 slots/cores (4 CPU x 2 CORES x 2 THREADS) available for tasks to be allocated and are run in parallel.
Worker Configuration
In the above diagram configuration of the worker is different in each of the nodes with a similar number of CPUs.
- Worker-1 has two executors with two separate JVM processes running which means if data has to be shuffled from Task 1 to Task 3 it requires inter-process communication as both are different processes.
- Worker-2 is a fat worker where the executor utilizes available CPUs if Task 5 has to send shuffle data to Task 7, it doesn’t require IPC (inter process communication) as both tasks are within the same JVM or process and utilize inter-thread communication.
- The shuffle between tasks in Worker-1 takes more time as they have to go through IPC (inter-process communication) compared with the tasks in Worker-2 which are in the same JVM process. The downside of it is if a single task crashes in Worker-2 entire worker gets crashed as its single JVM, while in Worker-1 one of the executors on the machine can continue to run as they are isolated from errors in the other executor JVM.
There is no ideal approach between light executors and fat executors, it all depends on the user workloads and both work well. While Spark can work on machines with different configurations and varied setups of slots/nodes, the suggested approach is to have a uniform configuration so the parallelization would be more or less equal across all nodes and executors. Varied configurations might result in the same tasks taking more time in one worker compared to another worker. More details on worker configuration here
Execution graph of wordcount example
Let’s take a simple word count program and understand the execution model.
//$spark-shell --master local[2]
//Scala
val textFile = sc.textFile("README.md")
val rddWordCount = textFile
.flatMap(line => line.split(" "))
.map(word => (word,1))
.reduceByKey(_+_)
val count = rddWordCount.collect()
count.foreach(println)
Spark UI shows valuable information and is the place to understand the jobs and performance metrics. Execute the example wordcount program in spark-shell and open a browser at localhost:4040. Use appropriate URL for non-local spark environment.
Spark identified one job triggered at action statement collect()
statement as seen from the UI. Take note of information here the duration taken to complete the job, number of stages (2), number of tasks (4) across all stages, etc. if there are multiple jobs in the application, the total time would be the sum of the time taken by individual jobs. Two parallel tasks are run in parallel with configuration $spark-shell --master local[2].
Clicking on the collect you will reach from Jobs to stages screen as above. As discussed earlier a stage gets split when there is a shuffle boundary. In the case of the wordcount program, reduceByKey
results in a shuffle because we are combining data related to a single key (or word) across all partitions. So two stages are created one at map
operation and one at collect
action. On the rightmost side of the image you can see the most important information, the size of the input file, shuffle read and write bytes. Note that the map
task writes the shuffle information and the collect
stage reads the shuffle information.
A detailed view of the application is shown on clicking on the DAG visualization link on the top.
reduceByKey
operation requires shuffle and so the DAG is split into two stages at the point where reduceByKey
is added in the application code.
A detailed screen for a stage is reached by clicking on each of the individual stage links in the job details screen.
A lot of valuable information is available on this screen starting with summary info on the top left.
Review every metric by looking through a performance lens. Investigate glaring, obvious, and out of normal value metrics, dig more detail and find a reason and a solution. Tuning 10seconds to 8 seconds might not make a difference but reducing from 1 min to 10 seconds makes a big difference. Identify and pick metrics with high numbers (not all cases) to tune it.
Task execution time is a sum of Scheduler Delay, Deserialization Time, Shuffle Read Time (optional), Executor Runtime, Shuffle Write Time (optional), Result Serialization Time, Getting Result Time well describe here
Our motive in performance tuning is to reduce the task execution time. A task is the lowest unit of work in Spark, and tuning this and the parameters impacting this will improve the performance.
A detailed explanation of the UI and metrics is available here. Familiarize yourselves with the definition of each metric. Before you do performance tuning, capture these metrics, and it will help to compare post the performance tuning and confirm the improvement.
Focus on the locality level of the tasks. Ideally if you see anything other than PROCESS_LOCAL or NODE_LOCAL, it’s something to be investigated. The worker configuration explained above light or fat, and the amount of data shuffling will impact the locality of the data. More details on data locality here .
Performance Techniques, tips, best practices and anti-patterns:
The three fundamental attributes to be tuned in massively parallel distributed systems to achieve better performance are, Computation (CPU), Data (Memory), Input/Output (Read, Write, Disk Network Bandwidth.). Our job is to select techniques that optimize on one or more of these three attributes.
- Tune the number of partitions to be a factor of the number of cores.
Impact: CPU
- Illustrating with an example — Assume there are ten cores in our cluster, our dataset has 20 partitions, and each task takes 2 seconds to complete. When we run this computation, the first set of 10 partitions takes 2 seconds all running in parallel, and the second set of 10 partitions takes another 2 seconds for a total of 4 seconds to complete the computation. Assume the same 10 cores in the cluster but with the dataset having 21 partitions, in this case, the first 10 get scheduled first, completed in 2 seconds, the next 10 partitions consume 2 seconds, and we still have one more partition that gets scheduled for another 2 seconds for a total of 6 seconds. For the 21st partition, only one task gets executed while other slots are idle wasting the available slots and effectively reducing parallelism. Getting task parallelization correct is important to improve performance
2. Filter the data early
Impact: I/O, Data (Memory)
Filter the data as early as possible, ideally at the beginning of the data flow. Early filtering will have a significant impact on the performance of joins. Reducing the dataset size via filtering will reduce the amount of data to shuffle, and improve the performance of joins.
3. Cache/persist only if RDD/Dataframe is repeatedly used
Cache the RDD/Dataframe if you are using the RDD/Dataframe frequently, in multiple calculations down in the program. Avoid caching if the RDD/Dataframe is not reused at all. A large Dataframe that is cached can quickly put pressure on executor memory.
Impact: Memory, I/O
import spark.implicits._
val df1 = spark.read.json("data/sample/employees.json")
df.show() //display the dataframe values//+-------+------+
//| name|salary|
//+-------+------+
//|Michael| 3000|
//| Andy| 4500|
//| Justin| 3500|
//| Berta| 4000|
//+-------+------+val df2 = df1.filter($"salary" > 3500)
df2.show() //display filtered values
//+-----+------+
//| name|salary|
//+-----+------+
//| Andy| 4500|
//|Berta| 4000|
//+-----+------+
In the above example if you look at spark UI you can see that for df2.show()
input file is read again since df1 is not cached or persisted. This will cause huge performance issues on large data sets. The optimum way in this scenario is to cache df1 taking into account memory pressure in the cluster at that point in time.
df.cache()
df2.show()
once you cache df1 and then run df2.show()
the data is then read from memory instead of the disk there by reducing I/O load and improving performance.
//in cases if df is not used at all better to rewrite as
val df2 = spark.read.json("data/sample/employees.json")
.filter($"salary" > 3500)
4. Watch out for partitions with empty data, repartition, or coalesce to optimize.
Impact: CPU
In some scenarios, an operation can result in the reduction of data but not necessarily the number of partitions. For instance, if you read data from a file and applied a filter. The initial dataset has maybe 100 partitions, but the filtered data may be residing only in 10 partitions, it is also possible that 2 or 3 partitions of the filtered dataset have the bulk of the data creating data skew.
Running without repartition/coalesce causes unnecessary CPU cycles since tasks will be scheduled for all the partitions whether it’s 0 bytes or not. The problem becomes exaggerated in large clusters with hundreds of available slots and filtering reduces 90% of the input data. Filtering can result in empty partitions, repartition, or coalesce to improve overall performance.
5. Check the physical plan if filters are applied
Impact: I/O, Memory
import spark.implicits._
val df1 = spark.read.json("data/sample/employees.json").cache()
val df2 = df1.filter($"salary" > 3500)
df2.show() //display filtered valuesdf2.explain()== Physical Plan ==
*(1) Project [name#7, salary#8L]
+- *(1) Filter (isnotnull(salary#8L) AND (salary#8L > 3500))
+- FileScan json [name#7,salary#8L] Batched: false, DataFilters: [isnotnull(salary#8L), (salary#8L > 3500)], Format: JSON, Location: InMemoryFileIndex[data/sample/employees.json], PartitionFilters: [], PushedFilters: [IsNotNull(salary), GreaterThan(salary,3500)], ReadSchema: struct<name:string,salary:bigint>
Three kinds of filters DataFilter, PushedFilters, PartitionFilters can be seen in the physical plan, and data is read from memory and the filter condition is pushed. And in this case, the dataset has no partition filters. Check the plan to see if the source is capable of supporting multiple partitions and if indeed a partition filter is applied. More Details here
6. The spark.read.xxxx()
capabilities vary differently per reader.
Impact: I/O, Memory
if you are reading from a source and applying a filter and assuming that filter is pushed to the source and so only required data is read, this will not work in all cases. Whether the filter is pushed down to the source or not depends on the capabilities of reader action to do push-down queries. A database might support and send only the filtered data set, a text file or JSON file reader may not support and the complete file has to be read before filtering is applied. The query explain plan will give the details if the pushdown filter is applied or not.
val df1 = spark.read.json("data/sample/employees.json")
.filter($"salary" > 3500)
7. Be wary of SELECT * queries, explicitly define the columns. Defining explicit columns will not help if the source doesn’t support project queries.
Impact: I/O, Memory
Anyone with a SQL background knows that the SELECT * queries are not optimal queries, since we are scanning all of the data. Selecting required columns is the best practice to reduce the amount of data read from the disk and network. It helps reduce job time to a large extent, as reading disk/network is slower when compared with reading from RAM.
In some cases, this optimization will not have impact. For instance, if you are reading a file from a file system or blob store which does not support projection filters, Spark has to read the complete file and select particular columns. Explicit selection of columns helps with columnar formats like parquet, but if you are reading row format files such as text, CSV, or AVRO this may not be helpful. So don’t assume using explicit columns will always help verify the explain plan or Spark UI.
8. Joins are the first candidates for optimization and most significant areas of performance impact.
Impact: Shuffle, I/O (Network), Memory
A lot of information is available on the optimization of joins, I will not be covering it here in detail, but basic things to take care of
- Reduce data before joining with other datasets
- Make sure data is not read again for joining purposes
- Handle data Skew and avoid empty or uneven partitions in datasets
- Reduce the amount of the data to shuffle
- Use Analytical/Windowing functions in place of self joins
Some good articles — join optimizations, art of joining
9. Avoid shuffling as much as possible, but shuffling is not always bad.
Impact: I/O (Network bandwidth)
Shuffling of data is a critical aspect of distributed parallel processing systems. Reducing data shuffling is good in the majority of the cases as it reduces the time to exchange data between different tasks, but do not assume that it is good in all cases. Sometimes it is best to force the shuffling to improve performance. A non-splittable zip file, for instance, is always read as a single partition until an explicit shuffle operation such as reduceByKey
forces to shuffle and repartition the data.
In these cases, it makes sense to repartition immediately after reading the file, forcing the shuffle operation to improve the computation time. More details on shuffling here.
10. Avoid User Defined Functions as Spark will not understand user code to optimize
Category: CPU
11. Use mapPartition()
operation instead of map
Impact: CPU
//scala
spark.read.text("README.md").flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_+_)// For map operation word => (word,1) is a function that will be called for every element in the partition
If the transformation function is not simple and complex, has more than a couple of lines, opens database or network connections, instantiates a complex library to be used for computation prefer mapPartition()
over the map. The function in mapPartition()
is called once and entire partition data is passed to that function. Unlike map
, mapPartition()
avoids running the complex operations as described above for each record and significantly improves the performance.
12. Check for connections that are not closed properly, HTTP, JDBC, TCP, etc.
Impact: Memory
Unclosed connections cause memory leaks that are difficult to trace and debug. These are some of the causes of frequent spark job crashes. Add this to your code review checklist as a best practice. A good example here.
13. Always inspect the amount of memory occupied by data structures.
Impact: Memory, Shuffle
Always tune the data structures used. Avoid the tendency to create deep hierarchical POJOs and data structures. For instance, a customer would represent the main entity class within the customer class, an Address object and an array list of addresses, etc.
Too many and too deep objects create memory pressure on the cluster, causing OutOfMemory exceptions, and frequent garbage collection pauses, which contribute to delays in processing.
Each Java object has minimum overhead bytes. Deep hierarchical objects and data structures bloat up memory and increase serialization/deserialization times. It might be best to use a single class with just a String attribute that contains JSON data. So measure the object size, and take a careful and conscious decision of data structures choice.
14. Check for high Garbage Collection times
Impact: CPU, I/O
Garbage collection tuning is a complex process that requires Java expertise, and data engineers with no Java background will not have the necessary skill set. The best way is to take the help of a java expert with analysis and optimization. While tuning requires Java expertise, spotting is easy. The spark UI clearly shows the GC times, and anything more than milliseconds value is a candidate for investigation. More details on memory tuning here.
15. Checkpoint the Dataset to truncate the lineage
Impact: Memory
Truncating the logical plan of the dataset is particularly useful in machine learning algorithms that most often have iterative logic. It helps avoid an ever growing logical plan. More details here.
16. Don’t bring large datasets to the driver to write to the output
Impact: Memory, CPU
It is an obvious mistake that beginner spark programmers do. Data is collected back to the driver to generate the final output instead of directly writing from executors. It is an inefficient way of output generation, and in most cases, the driver crashes. Options to connect to external storage, blob store, TCP, JDBC, etc directly from executors are available. Avoid the temptation of collecting data at the driver and writing single-threaded output.
There could be an argument against this design since executors can fail and rerun the same logic, which will result in duplicate errors. So programmers prefer to bring data to the driver and create transactions via JDBC to write to the database. It could be ok for updating maybe 10 or 15 records. But it is always a bad design, and there is a better design pattern. For instance, executors can write to temporary or staging tables, even if executors are rerun data is overwritten in temp/staging tables and not original tables. Once parallel processing is completed, a single statement is issued to merge data from staging to actual tables in one transaction from the driver, and this is handled by the databases very well, making it a better approach than bringing data to the driver and then writing to the database.
17. Last but not least watch out for many many spark configuration settings that will have significant impact. There are tons of them and there will be variations across Spark versions to keep an eye for configuration settings.
Summary
- Follow 5 step approach for optimization, don’t tune aimlessly.
- Three fundamental blocks for tuning CPU, Memory, I/O
- Know where and what to look for in Spark UI
- Understand Apache Spark Execution model
- Follow the best practices
Thank you for reading this article — I hope you enjoyed it. Feel free to comment, share it and let me know what you think about it.
Disclaimer: All the opinions expressed are personal independent thoughts and not to be attributed to my current or previous employers.