Unable to parallelize the code using the apply API from Pandas on PySpark

Directly use the apply function from pyspark.pandas without wrapping it in a lambda function.

Written by Amruth Ashoka

Last published at: January 29th, 2025

Problem

When applying computation on a column in Pandas on PySpark Dataframe, the execution does not happen in parallel. 

 

Example

In the following code, a Pandas library is imported from Pyspark and a Pyspark Pandas DF named df is defined. A function temp_function is also included, to apply to the 'col3' column using the apply method. The function also includes a type hint.

 

%python

# This code runs on driver

import time
from pyspark import pandas as ps
import numpy as np

df2 = ps.DataFrame(np.random.choice([1,2,3], size=(3,3))).rename(columns={0: 'col1', 1: 'col2', 2:'col3'})
print(type(df2))

def temp_function(x) -> int:
    print("Hello")
    time.sleep(5)
    print("Func_end")
    return x

df2['col3'] = df2['col1'].apply(lambda x: temp_function(x))

 

However, this code does not achieve parallel execution as expected. Instead, all operations are confined to the driver. 

 

In the following image, all print statements from temp_function appear directly in the console, rather than in the executor logs. Moreover, the execution is eager, with an Apache Spark job being triggered twice: first when the function is executed and again when the DataFrame is printed.

 

 

Cause

The lack of parallel execution in the code stems from the use of the apply method with a Python lambda function, which inherently does not have a return type. Although temp_function is defined with a type hint, the lambda wrapper used in the apply method does not relay this information to PySpark's execution engine. 

Consequently, PySpark does not interpret or distribute the execution across the cluster. Instead, it operates only on the driver node, treating the function as a Python function rather than a distributed operation, leading to the eager and non-parallel execution observed.

 

Solution

To ensure parallel execution of operations on a PySpark DataFrame, directly use the apply function from pyspark.pandas without wrapping it in a lambda function. 

This approach uses PySpark's capability to distribute tasks across the cluster's executors. By specifying the return type directly in the function definition (temp_function), the return type is read and not inferred, which reduces the execution overhead. 

 

import time 
import numpy as np 
from pyspark import pandas as ps

df = ps.DataFrame(np.random.choice([1, 2, 3], size=(3, 3))).rename(columns={0: 'col1', 1: 'col2', 2: 'col3'}) 
print(type(df)) 

def temp_function(x) -> int: 
print("Hello") 
time.sleep(10)
print("Func_end") 
return x

df['col3'] = df['col1'].apply(temp_function) 

 

In the following image, the print statements within temp_function are now logged in the executor logs instead of the console, confirming the distributed nature of the task execution.