Problem
You attempt to add partitions in parallel to an external Delta table with the ALTER TABLE ADD PARTITION
commands using a thread pool. The following code provides an example.
%python
def grant_group_tables (partition_date):
try:
table_name= '<catalog-name>.<schema-name>.<table-name>'
location=f"s3://<bucket-name>/partitions/data_day={partition_date}"
spark.sql(f"alter table {table_name} add partition (data_day='{partition_date}') location'{location}'")
except Exception as exec:
print (exec)
if __name_=='__main__':
partition_date='<yyyymmdd>'
final_list= 11
for i in range (0,50) :
final_list.append(datetime.strftime(datetime.strptime(partition_date, '%Y%m%d')+relativedelta(days=i), '%Y%m%d'))
pool=ThreadPool(ms.cpu_count())
res=pool.map(grant_group_tables, final_list)
pool.terminate()
The operation fails with the following error.
[DELTA_CONCURRENT_APPEND] ConcurrentAppendException: Files were added to partition [data_day=<data-day>] by a concurrent update. Please try the operation again. Conflicting commit: {"timestamp":<timestamp>,"userId":"<user-id>","userName":"<user-name>","operation":"Manual Update","operationParameters":{},"notebook":{"notebookId":"<notebook-id>"},"clusterId":"<cluster-id>","readVersion":<version>,"isolationLevel":"WriteSerializable","isBlindAppend":false,"operationMetrics":{},"engineInfo":"Databricks-Runtime/15.3.x-photon-scala2.12","txnId":"<txn-id>"} Refer to https://docs.databricks.com/delta/concurrency-control.html for more details.
Cause
When multiple operations try to access the same data at once, they interfere with each other. Since the transactions are non-blind appends, they conflict with each other and cause the ConcurrentAppendException
.
Solution
Instead of adding partitions in parallel, combine them into a single ALTER TABLE ADD PARTITION
command to add all partitions in one commit.
Example
%sql
ALTER TABLE {<table-name>} ADD
PARTITION (data_day ='p1') LOCATION 'path1'
PARTITION (data_day ='p2') LOCATION 'path2'
PARTITION (data_day ='p3') LOCATION 'path3';
The following code demonstrates the correction applied to the example from the problem statement. It combines the partitions and runs a single ALTER
command.
%python
result = ""
partition_date='<yyyymmdd>'
# Keeping 5 times. Changing this can add more partitions to the execution statement.
for i in range(1, 6):
location = f"s3://<bucket-name>/partitions/data_day={partition_date}"
formatted_string = f"partition(data_day='{partition_date}') location '{location}'"
# Append the formatted string to the result
result += formatted_string + " "
partition_date = datetime.strftime(datetime.strptime(partition_date,'%Y%m%d')+ relativedelta(days=1),'%Y%m%d')
#End for loop
# Output the final result
sql_statement = f"alter table <catalog-name>.<schema-name>.<table-name> add {result}"
spark.sql(sql_statement)