Problem
You notice your job runs for a long time but does not complete successfully. It gets stuck at an intermediate cell using a window function. The following code provides an example, where:
- 
<PARTITION_BY_COLS>is a list or tuple of columns defining each logical group. - 
<ORDER_BY_COLS>is a list or tuple of columns defining sort order within group. - 
<SELECT_COLS>is a list or tuple of columns to keep from the input df. - 
<AMOUNT_COL>is a single numeric column to be cumulatively summed. - 
<RN_ALIAS>is an output column name for row number. - 
<RUNSUM_ALIAS>is an output column name for running (cumulative) sum. 
from pyspark.sql import functions as F, Window
w = Window.partitionBy(*<PARTITION_BY_COLS>).orderBy(*<ORDER_BY_COLS>)
df_window = (
    df.select(*<SELECT_COLS>)
      .withColumn("<RN_ALIAS>", F.row_number().over(w))
      .withColumn(
          "<RUNSUM_ALIAS>",
          F.sum(F.col("<AMOUNT_COL>")).over(
              w.rowsBetween(Window.unboundedPreceding, Window.currentRow)
          )
      )
)
Cause
Each window partition (which defines how data is grouped for a window operation) is processed by a single executor. Correct evaluation requires all rows for that partition in order. For very large window partitions, this can cause memory pressure or require spilling to disk, which causes the job to stall or fail.
Solution
Use the groupBy function instead, to aggregate results and show per-partition results of the aggregate. 
After shuffle, each partition contains all the data for one or more group keys, allowing per-key aggregation without needing further communication. Each partition performs aggregation for its local keys, and then the results are combined as needed. The aggregation step is generally efficient if partitions are well-balanced and data is not skewed. The following code provides an example, where:
- 
<GROUP_BY_COLS>is a list or tuple of columns to group by. - 
<AMOUNT_COL>is a single numeric column to aggregate with sum. - 
<SUM_ALIAS>is an output column name for the summed metric. - 
<COUNT_ALIAS>is an output column name for the row count. 
from pyspark.sql import functions as F
df_grouped = (
    df.groupBy(*<GROUP_BY_COLS>)
      .agg(
          F.sum(F.col("<AMOUNT_COL>")).alias("<SUM_ALIAS>"),
          F.count(F.lit(1)).alias("<COUNT_ALIAS>")
      )
)