Job fails with ExecutorLostFailure error due to excessive garbage collection (GC)

Broadcast the smaller table instead of the larger one.

Written by Rajeev kannan Thangaiah

Last published at: January 31st, 2025

Problem

When trying to use broadcasting, you experience an ExecutorLostFailure error. 

 

ExecutorLostFailure (executor <> exited caused by one of the running tasks) Reason: Executor heartbeat timed out after <> ms

 

For instructions on addressing this ExecutorLostFailure error message due to other OOM issues, refer to Job fails with ExecutorLostFailure due to “Out of memory” error

 

Cause

This article covers how to address ExecutorLostFailure issues caused by excessive garbage collection on the executor and the use of broadcasting.

 

Broadcasting an 8GB table means that the table is sent to every executor. If the executor's memory is insufficient to handle both the broadcast data and other tasks, it can lead to excessive garbage collection (GC) or even an Out of Memory error (OOM), causing the executor to crash. This memory overload can result in heartbeat timeouts, as the executor spends most of its time trying to manage memory and fails to respond to the driver's heartbeat checks.

 

Solution

Avoid data shuffling by broadcasting one of the two tables or DataFrames (the smaller one) that are being joined together. The table is broadcast by the driver, who copies it to all worker nodes.

 

When executing joins, modify the autoBroadcastJoinThreshold to a value lower than 8GB preferably 1GB, based on your cluster configuration. Alternatively, remove the parameter and let the adaptive query execution (AQE) decide the best optimizer strategy for your job for better performance.

 

spark.databricks.adaptive.autoBroadcastJoinThreshold  <your-threshold>
spark.sql.autoBroadcastJoinThreshold  <your-threshold>