Performance Metrics
We compute several high-level metrics that indicate potential problems, or, if you will, optimization opportunities.
Generally, the higher the score, the more likely or severe the problem is. Of course, because each job is different, we want to stress that these are potential problems. If you examine your job, and decide it’s doing what it should, it’s fine to ignore that job’s problem metics in future.
We find it useful to review the trends of the problem metrics. Large jump might indicate a regression that require investigation.
Spill
The spill problem score shows the sum of the Spill (Disk)
metric across all stages.
Each task of a Spark stage processes a given chunk of data. For example, for a join, each partition of each side of the join must be sorted. Ideally, the processing is done entirely in memory. If not enough execution memory is available, Spark can use disk as scratch space, and this metric shows exactly show much data had to be written to disk.
Spill is never a good thing, but spilling large amounts of data can considerably slow down the job, especially if you’re using cloud instances with network disks.
The recommended way to reduce spill is to decrease size of partitions by increasing parallelism
- If the operation that generates spill happens after shuffle (so the
Shuffle Read
metric in the Spark UI is above zero), it is best to increase thespark.sql.shuffle.partitions
configuration value. - If spliling happens immediately when reading data (so the stage has zero
Shuffle Read
metric and non-zeroInput Size
), this suggests that source data has too large partitions, and it’s recommended to adjust the producer of that data.
In some setups of Spark Performance Advisor, the spark.sql.shuffle.partitions
configuration parameter is automatically
adjusted using execution history to minimize spill.
Wasted task time
The wasted task time metric shows how much execution time was wasted due to job errors. When an application fails, all the task time is considered waisted. For a succeeded application, we consider wasted all task time of failed tasks, as well as all retries of a task, except the very last.
Typical issues that result in wasted task time are:
- Outright code errors that prevent job execution.
- Loss of executor, which in turn causes partial loss of already prepared data, and subsequent computation. The are two most common reason:
-
Spot interruption (when using spot instances). The
Executor removed: spot interruption
metric shows the number of executors removed due to spot interruption. Spot interruptions are caused by your environment, and generally can’t be solved at the job level. Instead, you might consider using different kinds of cloud instances. -
Executor loss due to out-of-memory error, shown in the
Executor removed: OOM
column, can usually be addressed by changing the application settings. To do that, determine the exact OOM reason by reviewing the Spark UI, Jobs tab, theEvent Timeline
. section.- Memory overhead OOM means that the executor process used more physical memory than was allocated to it, which in turn
is almost always caused by unexpected amount of native memory (outside JVM heap) allocated either by native libraries or
by JVM itself. This condition can be diagnosed by message saying
The executor with id N exited with exit code 137(SIGKILL, possible container OOM) ... termination reason: **OOMKilled**
In this case, you must change the values of thespark.executor.memory
andspark.executor.memoryOverhead
to increase memory overhead. Their sum must still be less than allocatable memory in your envrionment. - Heap OOM means that the JVM has ran out of heap storage. In that case, Spark UI status for that executor will end with
termination reason: Error
, and the executor logs containjava.lang.OutOfMemoryError: Java heap space
. This is a rate condition, and you can alleviate it by:- Decreasing the partition size, by increasing
spark.sql.shuffle.partitions
- Decreasing
spark.executor.cores
(the number of parallel tasks per executor) - Decreasing
spark.memory.fraction
- Decreasing the partition size, by increasing
- Memory overhead OOM means that the executor process used more physical memory than was allocated to it, which in turn
is almost always caused by unexpected amount of native memory (outside JVM heap) allocated either by native libraries or
by JVM itself. This condition can be diagnosed by message saying
-
- Driver OOM error. Similarly it can be caused by:
- memoryOverhead OOM, where driver log or Kubernetes pod status might contain
Final state: FAILED (driver container failed with ExitCode: 137, Reason: OOMKilled)
. To address it, you can increase driver memory overhead. The specific way to do it depends on how to run the Spark job. - heap OOM, where driver log might contain
java.lang.OutOfMemoryError: Java heap space
. In this case, it’s recommended to investigate the reason for memory consumption. For example, it can be caused by collecting too much data on the driver. In rare cases, it can be caused by having too many stages and tasks. If no optimization is possible, you can increase driver memory.
- memoryOverhead OOM, where driver log or Kubernetes pod status might contain
- Running out of disk space, which is indicated by the
No space left on device
message. As a rule of thumb, Spark job input should not be more thanexecutor_count * executor_disk_size / 2
. - There might be accidental task failures, especially for tasks that interact with external services. They are retried, and so some time is wasted, but it is rarely important.
Unused executors
The unused executors metric shows what percentage of time the executors are running without actually executing any tasks.
More formally:
- We compute maximum possible task time as
total_executor_run_time * core_count
- We decide there’s inefficiency if
task_time / max_possible_task_time < 0.4 and max_possible_task_time > 560 min
- If so, we report inefficiency score as
max_possible_task_time - task_time
While the actual task time is always less than 100% utilization of all executors, if it is considerably less it means that for a long of time, executors or some of their cores are not doing anything, while still reserving the codes.
Typical reasons that can be optimized include:
- Executes are idle while some synchronous operations are performed on driver. For example, a driver might
collect()
data and call external API on each row. In that case, it’s possible to either- Perform the operations on executors, for example using
DataFrame.foreach
- Reduce the number of executors by decreasing the
spark.dynamicAllocation.maxExecutors
parameter.
- Perform the operations on executors, for example using
- Large DataFrame is being saved using
df.repartition(numPartitions = 1)
, usually to avoid creating many small files. In that case, it is recommended to dynamically compute the desired number of partitions given the desired size of Parquet files and input data.
Skew
Skew is a classic problem when one or several partitions take considerably more time than others, and the total job uptime is much higher that would be if time is equally distributed.
The metric contains the sum of taskTimeMax - taskTime95Percentile
across all stage, in minutes, but only if the
condition (taskTimeMax - taskTime95Percentile) > 5min and taskTimeMax/taskTime95Percentile > 10
holds. In other
words, counting only skews that can seriously affect the run time.
If skew is found, it is necessary to determine which data cause it. Often, it is caused by nulls, or other placeholder
values such as UNKNOWN
in join/group columns. The salting technique can be used to address this problem.
Starting with Spark 3, the adaptive query execution (AQE) can often automatically correct skew for joins. However, it is limited and skew can happen in presense of window functions, grouping and partition coalesce.
App time problem
The app time problem field contains the number of Spark application which took too long to execute. The exact condition
right now is "App uptime" > 8h or "Total task time" > 1000h
.
While long-running jobs are sometimes necessary, they generally deserve investigation, because:
- It might be a result of hangs, or accidental processing of excessive data (for example, recomputing all history every day)
- Long-running job is fragile – a recoverable error might still cause a lot of wasted time. Therefore, long running jobs are better split into smaller incremental jobs.
Input size
The input size problem field contains the number of Spark application that read a lot of data. Specifically, the limit is 3TB.
While in some cases, reading a lot of data is necessary, and while it might happen as result of a large backfill job, for regular application it’s recommended to check if reading all the data is necessary, or whether it’s possible to make the job incremental, reading just one day and appending the data to the previously computed tables.