Problem
When trying to use broadcasting, you experience an ExecutorLostFailure
error.
ExecutorLostFailure (executor <> exited caused by one of the running tasks) Reason: Executor heartbeat timed out after <> ms
For instructions on addressing this ExecutorLostFailure
error message due to other OOM issues, refer to Job fails with ExecutorLostFailure due to “Out of memory” error.
Cause
This article covers how to address ExecutorLostFailure
issues caused by excessive garbage collection on the executor and the use of broadcasting.
Broadcasting an 8GB table means that the table is sent to every executor. If the executor's memory is insufficient to handle both the broadcast data and other tasks, it can lead to excessive garbage collection (GC) or even an Out of Memory error (OOM), causing the executor to crash. This memory overload can result in heartbeat timeouts, as the executor spends most of its time trying to manage memory and fails to respond to the driver's heartbeat checks.
Solution
Avoid data shuffling by broadcasting one of the two tables or DataFrames (the smaller one) that are being joined together. The table is broadcast by the driver, who copies it to all worker nodes.
When executing joins, modify the autoBroadcastJoinThreshold
to a value lower than 8GB preferably 1GB, based on your cluster configuration. Alternatively, remove the parameter and let the adaptive query execution (AQE) decide the best optimizer strategy for your job for better performance.
spark.databricks.adaptive.autoBroadcastJoinThreshold <your-threshold>
spark.sql.autoBroadcastJoinThreshold <your-threshold>