Today I noticed that some of my workflows were failing with the following error:
ConcurrentAppendException: [DELTA_CONCURRENT_APPEND] ConcurrentAppendException: Files were added to the root of the table by a concurrent update. Please try the operation again.
The workflow consists of a for each task that runs the same notebook for a number of tests in parallel.
The problem is that the saveAsTable
function writes to an unpartitioned table results_table_path
:
# Write to delta table
(df.write
.format("delta")
.mode("append")
.saveAsTable(results_table_path)
)
This means that when two or more tests finish at the same time, and thus simultaneously eddit the results table, this generates a ConcurrentAppendException.
One way to resolve this, is to partition this table by the test, i.e. the test_name
column in my case. Now, each notebook will edit data that lies on a separate partition, meaning that no two tests will edit the same rows on the same partition.
# Write to delta table
(df.write
.partitionBy("test_name") # Prevent ConcurrentAppendException
.format("delta")
.mode("append")
.saveAsTable(results_table_path)
)