Problem
Job fails with an ExecutorLostFailure error message.
ExecutorLostFailure (executor <1> exited caused by one of the running tasks) Reason: Executor heartbeat timed out after <148564> ms
Cause
The ExecutorLostFailure error message means one of the executors in the Apache Spark cluster has been lost. This is a generic error message which can have more than one root cause. In this article, we will look how to resolve issues when the root cause is due to the executor running out of memory
Let's say your executor has too much data to process and the amount of memory available in the executor is not sufficient to process the amount of data, then this issue could occur. For e.g. if the executor in your cluster has 24GB capacity and if the cumulative amount of the data size corresponding to all the tasks that are getting executed on that executor is greater than 24GB, then this issue could occur
How do you determine if OOM is the reason for the executor getting lost?
- Open the Spark UI.
- Click Stages.
- Click Failed stages.
- Click the description that corresponds to the failed stage.
- Review the bottom of the stage details page.
- Sort the list of tasks on the error column.
The error messages describe why a specific task failed. If you see an error message that says out of memory, or a similar error like java.lang.OutOfMemoryError it means the task failed because the executor ran out of memory.
Solution
When an executor is failing due to running out of memory, you should review the following items.
Is there a data skew?
Check whether the data is equally distributed across executors, or if there is any skew in the data.
You can find this by checking the stage summary table on the stage details page of the Spark UI.
If there is data skew and if this is the only executor that has more data in it, you need to resolve the skew to prevent the executor from running out of memory.
In most cases Adaptive Query Execution (AQE) automatically detects data skew and resolves the issue. However, there are some edge cases where AQE may not detect data skew correctly. Please review Why didn’t AQE detect my data skew? (AWS | Azure | GCP) for more information.
If you are having trouble resolving data skew, you can try increasing the number of partitions or by explicitly mentioning the skew hints as explained in the How to specify skew hints in dataset and DataFrame-based join commands article.
A partition is considered skewed when both (partition size > skewedPartitionFactor * median partition size) and (partition size > skewedPartitionThresholdInBytes) are true.
For example, given a median partition size of 200 MB, if any partition exceeds 1 GB (200 MB * 5 (five is the default skewedPartitionFactor value)), it is considered skewed. Under this example, if you have a partition size of 900 MB it wouldn't be considered as skewed with the default settings.
Now say your application code does a lot of transformations on the data (like explode, cartesian join, etc.). If you are performing a high number of transformations, you can overwhelm the executor, even if the partition isn't normally considered skewed.
Using our example defaults, you may find that a 900 MB partition is too much to successfully process. If that is the case, you should reduce the skewedPartitionFactor value. By reducing this value to 4, the system then considers any partition over 800 MB as skewed and automatically assigns the appropriate skew hints.
Please review the AQE documentation on dynamically handling skew join (AWS | Azure | GCP) for more information.
Is the executor capable enough?
If data is equally distributed across all executors and you still see out of memory errors, the executor does not have enough resources to handle the load you are trying to run.
Increase horizontally by increasing the number of workers and/or increase vertically by selecting a Worker type with more memory when creating your clusters.
Is it a properly configured streaming job?
If there is no apparent data skew, but the executor is still getting too much data to process, you should use maxFilesPerTrigger and/or the trigger frequency settings to reduce the amount of data that is processed at any one time.
Reducing the load on the executors also helps reduce the memory requirement, at the expense of slightly higher latency. In exchange for the increase in latency, the streaming job processed streaming events in a more controlled manner. A steady flow of events is reliably processed with every micro batch.
Please review the Optimize streaming transactions with .trigger article for more information. You should also review the Spark Structured Streaming Programming Guide documentation on input sources and triggers.
If you want to increase the speed of the processing, you need to increase the number of executors in your cluster. You can also repartition the input streaming DataFrame, so the number of tasks is less than or equal to the number of cores in the cluster.