Problem
Within the forEachBatch function, while executing batch DataFrame aggregation operations, your streaming job using the forEachBatch sink fails with a DUPLICATED_MAP_KEY error.
[ERROR] default_source: <microbatch-name> error Raw Stream Failed writing with mergeSchema to s3://XXX/ in epoch_id X on XXXX YYYY-MM-DD, Attempt to write without mergeSchema option. Error - An error occurred while calling o2464.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 39 in stage 141.0 failed 4 times, most recent failure: Lost task 39.3 in stage 141.0 (TID XXXX) (XX.XXX.XX.XX executor X): org.apache.spark.SparkRuntimeException: [DUPLICATED_MAP_KEY] Duplicate map key User was found, please check the input data.
If you want to remove the duplicated keys, you can set "spark.sql.mapKeyDedupPolicy" to "LAST_WIN" so that the key inserted at last takes precedence. SQLSTATE: 23505
The error still occurs despite:
- Setting the Apache Spark configuration
spark.sql.mapKeyDedupPolicytoLAST_WINboth within or outside theforEachBatchfunction. - Adding as a key value pair under Compute > Configuration > Advanced Spark config.
Cause
The spark.sql.mapKeyDedupPolicy setting is configured with an incorrect Spark session object. The configuration must be applied to the specific Spark session associated with the streaming DataFrame within the forEachBatch function.
Solution
Use the following approach inside the forEachBatch function to ensure the deduplication policy is applied to the active session handling the streaming job.
def process_batch(df, epoch_id):
spark = df.sparkSession
spark.conf.set("spark.sql.mapKeyDedupPolicy", "LAST_WIN")
df.write.format("<your-format>").save("<your-target-path>")
For more information on upserting from streaming queries using foreachBatch, review the Delta table streaming reads and writes (AWS | Azure | GCP) documentation.