Delta Lake is a powerful storage layer that brings reliability and performance to your data lake. One of its standout features is schema evolution, which allows you to handle changing schemas in a seamless manner. In this article, we will explore schema evolution in Delta Lake with practical examples to demonstrate how to add, modify, and drop columns in a Delta table.
What is Schema Evolution?
Schema evolution in Delta Lake refers to the ability to accommodate changes in the table schema, such as adding or modifying columns, while maintaining backward compatibility with existing data. This is especially useful when dealing with dynamic or semi-structured data sources where schema changes are inevitable.
Delta Lake ensures that schema changes are managed in an atomic and consistent manner by leveraging its transactional log. The transactional log records all changes, including schema modifications, as atomic units, ensuring that schema updates are either fully applied or completely rolled back in case of failure. This guarantees data consistency and reliability while preserving the ACID properties of the table.
Step-by-Step Example
We will demonstrate schema evolution with the following steps:
Creating a Delta Table
Adding New Columns and Appending Data
Dropping Columns
Time Travel
1. Creating a Delta Table
Let’s start by creating a simple Delta table with initial data. You can use Databricks Canvas to display the initial data as a table:
from pyspark.sql import SparkSession
# Sample data and schema
data = [("Alice", 34), ("Bob", 45), ("Cathy", 29)]
columns = ["Name", "Age"]
table_name = "test_catalog.default.schema_evolution_test"
# Create a DataFrame and write it to a Delta table
df = spark.createDataFrame(data, columns)
df.write.format("delta").mode("append").saveAsTable(table_name)
It just adds 3 rows and 2 columns as expected
spark.table(table_name).show()
+-----+---+
| Name|Age|
+-----+---+
|Alice| 34|
|Cathy| 29|
| Bob| 45|
+-----+---+
2. Adding New Columns and Appending Data
Next, let’s evolve the schema by adding new columns Gender
and Country
to the table.
from pyspark.sql.functions import lit
# Add new columns to the DataFrame
df = df.withColumns({"Gender": lit("Unknown"), "Country": lit("USA")})
# Write to the Delta table with schema evolution enabled
df.write.format("delta").mode("append").saveAsTable(table_name)
We will end up in the below error:
To enable schema migration using DataFrameWriter or DataStreamWriter, please set:
'.option("mergeSchema", "true")'.
For other operations, set the session configuration
spark.databricks.delta.schema.autoMerge.enabled to "true". See the documentation
specific to the operation for details.
Table schema:
root
-- Name: string (nullable = true)
-- Age: long (nullable = true)
Data schema:
root
-- Name: string (nullable = true)
-- Age: long (nullable = true)
-- Gender: string (nullable = true)
-- Country: string (nullable = true)
If we add mergeSchema as suggested,
df.write.format("delta").option("mergeSchema", "true").mode("append").saveAsTable(table_name)
spark.table(table).show()
+-----+---+-------+-------+
|Name |Age|Gender |Country|
+-----+---+-------+-------+
|Alice|34 |Unknown|USA |
|Cathy|29 |Unknown|USA |
|Bob |45 |Unknown|USA |
|Alice|34 |NULL |NULL |
|Cathy|29 |NULL |NULL |
|Bob |45 |NULL |NULL |
+-----+---+-------+-------+
Earlier in the first write operation we had just 3 rows and 2 columns. With addition of 2 new columns, we have got new 3 rows added including the new column values for Gender and Country. Delta’s schema evolution ensures that new rows conform to the updated schema, filling any new columns with null
values or specified defaults for existing rows.
What if we wanted just the Gender and Country columns added to our existing table with 3 rows and 2 columns, like below?
+-----+---+-------+-------+
|Name |Age|Gender |Country|
+-----+---+-------+-------+
|Alice|34 |Unknown|USA |
|Cathy|29 |Unknown|USA |
|Bob |45 |Unknown|USA |
+-----+---+-------+-------+
To do that, we need an overwrite operation and we could utilize that with the overwriteSchema
df.write.format("delta").option("overwriteSchema", "true").mode("overwrite").saveAsTable(table_name)
So, in delta we had 2 options to either merge the schema or overwrite the schema.
3. Dropping Columns
Delta Lake does not natively support dropping columns directly through a write operation. But there is a common workaround involves creating a new DataFrame without the unwanted columns and then overwriting the existing table or appending the modified data. Let’s see with the append
from pyspark.sql.functions import col
# Remove the Gender column
df = df.drop("Gender")
# Write the updated DataFrame to the Delta table with schema evolution
df.write.format("delta").option("mergeSchema", "true").mode("append").saveAsTable(table_name)
This results in addition of 3 rows without the Gender
ie., nulls because the df
, which is writing to table has dropped the column Gender
.
+-----+---+-------+-------+
| Name|Age| Gender|Country|
+-----+---+-------+-------+
|Alice| 34|Unknown| USA|
|Cathy| 29|Unknown| USA|
| Bob| 45|Unknown| USA|
|Alice| 34| NULL| USA|
|Cathy| 29| NULL| USA|
| Bob| 45| NULL| USA|
+-----+---+-------+-------+
If we wanted to see Gender
column to be removed from the table altogether, then we should use the overwrite option. Let’s see.
from pyspark.sql.functions import col
# Remove the Gender column
df = df.drop("Gender")
# Write the updated DataFrame to the table with schema evolution
df.write.format("delta").option("overwriteSchema", "true").mode("overwrite").saveAsTable(table_name)
spark.table(table).show()
It then gives the output with Name, Age, Country and removing Gender column
+-----+---+-------+
| Name|Age|Country|
+-----+---+-------+
|Alice| 34| USA|
|Cathy| 29| USA|
| Bob| 45| USA|
+-----+---+-------+
When to use mergeSchema
and overwriteSchema
mergeSchema
: Better to use this option when we wanted to add new columns to the existing schema without losing the existing data. Incremental addition.overwriteSchema
: Better to use this option when we wanted to replace the schema entirely and okay with overwriting the data. Starting afresh.
4. Time Travel
We can imagine a scenario where we need to check the state of a table as it existed on a specific date—for instance, "the day when we removed certain columns." Such scenarios might arise while debugging, validating data before and after a deployment, or analyzing historical changes for compliance or auditing purpose.
Delta Lake's Time Travel feature enables us to query and inspect the state of the table at a specific point in time or at a specific version. By leveraging the Delta Lake transaction log, we can seamlessly look back at the history of operations performed on the table, providing a robust mechanism for understanding and troubleshooting past states.
We have two options - 1. version 2. timestamp to go back in time and query the data.
df = (
spark
.read
.format("delta")
.option("versionAsOf", 0) # Version of the dataset
.table(table)
)
Or, we can use the timestamp as identification of the version number is practically not possible as there might be several write operations.
df = (
spark
.read
.format("delta")
.option("timestamp", '2024-12-06T21:26:12.000+00:00') # Timestamp of the dataset
.table(table)
)
If we wanted to know the entire table history, we could use the describe
spark.sql(f"DESCRIBE HISTORY {table}").select("version", "timestamp", "operation").show(10, False)
+-------+-------------------+---------------------------------+
|version|timestamp |operation |
+-------+-------------------+---------------------------------+
|2 |2024-12-06 21:30:13|WRITE |
|1 |2024-12-06 21:26:26|CREATE OR REPLACE TABLE AS SELECT|
|0 |2024-12-06 21:26:12|CREATE TABLE AS SELECT |
+-------+-------------------+---------------------------------+
The first version is 0 and the version version is so-and-so time and it was the state of the just adding 3 rows and 2 columns
df.show()
+-----+---+
| Name|Age|
+-----+---+
|Alice| 34|
|Cathy| 29|
| Bob| 45|
+-----+---+
Essentially delta uses the underlying .json files in the delta_log directory of this table - which one to consider and not to consider.
Benefits of Schema Evolution in Delta Lake
Flexibility: Allows dynamic schema changes without requiring a full table rewrite.
Backward Compatibility: Ensures that existing data remains intact and accessible.
ACID Compliance: Maintains transactional consistency during schema changes.
Simplified Data Management: Handles schema changes seamlessly with minimal code.
If we use structured streaming, then its a different approach in the table design. Will publish on that in next article.
Community
This article is written by Chanukya Pekala for the community called DataTribe Collective