Python APIs for using Delta Lake with Apache Spark
—
Advanced upsert functionality for Delta Lake tables supporting complex merge patterns with conditional logic. Enables efficient data synchronization, CDC operations, and schema evolution during merge operations.
Start a merge operation by specifying source data and join conditions.
class DeltaTable:
def merge(
self,
source: DataFrame,
condition: Union[str, Column]
) -> DeltaMergeBuilder:
"""
Merge data from source DataFrame based on condition.
Parameters:
- source: Source DataFrame to merge
- condition: Join condition as SQL string or Column expression
Returns:
DeltaMergeBuilder for configuring merge actions
"""class DeltaTable {
def merge(source: DataFrame, condition: String): DeltaMergeBuilder
def merge(source: DataFrame, condition: Column): DeltaMergeBuilder
}Configure actions for rows that match the merge condition.
class DeltaMergeBuilder:
def whenMatchedUpdate(
self,
condition: Optional[Union[str, Column]] = None,
set: Optional[Dict[str, Union[str, Column]]] = None
) -> DeltaMergeBuilder:
"""
Update matched rows with optional additional condition.
Parameters:
- condition: Optional additional condition for update
- set: Column mappings for update (required)
Returns:
DeltaMergeBuilder for method chaining
"""
def whenMatchedUpdateAll(
self,
condition: Optional[Union[str, Column]] = None
) -> DeltaMergeBuilder:
"""
Update all columns of matched rows with source values.
Parameters:
- condition: Optional condition for update
Returns:
DeltaMergeBuilder for method chaining
"""
def whenMatchedDelete(
self,
condition: Optional[Union[str, Column]] = None
) -> DeltaMergeBuilder:
"""
Delete matched rows with optional condition.
Parameters:
- condition: Optional condition for deletion
Returns:
DeltaMergeBuilder for method chaining
"""class DeltaMergeBuilder {
def whenMatchedUpdate(set: Map[String, Column]): DeltaMergeBuilder
def whenMatchedUpdate(condition: Column, set: Map[String, Column]): DeltaMergeBuilder
def whenMatchedUpdateAll(): DeltaMergeBuilder
def whenMatchedUpdateAll(condition: Column): DeltaMergeBuilder
def whenMatchedDelete(): DeltaMergeBuilder
def whenMatchedDelete(condition: Column): DeltaMergeBuilder
}Configure actions for source rows that don't match any target rows.
class DeltaMergeBuilder:
def whenNotMatchedInsert(
self,
condition: Optional[Union[str, Column]] = None,
values: Optional[Dict[str, Union[str, Column]]] = None
) -> DeltaMergeBuilder:
"""
Insert unmatched source rows with optional condition.
Parameters:
- condition: Optional condition for insertion
- values: Column mappings for insert (required)
Returns:
DeltaMergeBuilder for method chaining
"""
def whenNotMatchedInsertAll(
self,
condition: Optional[Union[str, Column]] = None
) -> DeltaMergeBuilder:
"""
Insert all columns from unmatched source rows.
Parameters:
- condition: Optional condition for insertion
Returns:
DeltaMergeBuilder for method chaining
"""class DeltaMergeBuilder {
def whenNotMatchedInsert(values: Map[String, Column]): DeltaMergeBuilder
def whenNotMatchedInsert(condition: Column, values: Map[String, Column]): DeltaMergeBuilder
def whenNotMatchedInsertAll(): DeltaMergeBuilder
def whenNotMatchedInsertAll(condition: Column): DeltaMergeBuilder
}Configure actions for target rows that don't match any source rows.
class DeltaMergeBuilder:
def whenNotMatchedBySourceUpdate(
self,
condition: Optional[Union[str, Column]] = None,
set: Optional[Dict[str, Union[str, Column]]] = None
) -> DeltaMergeBuilder:
"""
Update target rows not matched by source.
Parameters:
- condition: Optional condition for update
- set: Column mappings for update (required)
Returns:
DeltaMergeBuilder for method chaining
"""
def whenNotMatchedBySourceDelete(
self,
condition: Optional[Union[str, Column]] = None
) -> DeltaMergeBuilder:
"""
Delete target rows not matched by source.
Parameters:
- condition: Optional condition for deletion
Returns:
DeltaMergeBuilder for method chaining
"""class DeltaMergeBuilder {
def whenNotMatchedBySourceUpdate(set: Map[String, Column]): DeltaMergeBuilder
def whenNotMatchedBySourceUpdate(condition: Column, set: Map[String, Column]): DeltaMergeBuilder
def whenNotMatchedBySourceDelete(): DeltaMergeBuilder
def whenNotMatchedBySourceDelete(condition: Column): DeltaMergeBuilder
}Enable automatic schema evolution during merge operations.
class DeltaMergeBuilder:
def withSchemaEvolution(self) -> DeltaMergeBuilder:
"""
Enable schema evolution for merge operation.
Returns:
DeltaMergeBuilder with schema evolution enabled
"""class DeltaMergeBuilder {
def withSchemaEvolution(): DeltaMergeBuilder
}Execute the configured merge operation.
class DeltaMergeBuilder:
def execute(self) -> DataFrame:
"""
Execute the merge operation.
Returns:
DataFrame with merge operation metrics
"""class DeltaMergeBuilder {
def execute(): DataFrame
}# Source data with updates and new records
updates_df = spark.createDataFrame([
(1, "Alice", "Engineering", 95000),
(2, "Bob", "Marketing", 75000),
(5, "Eve", "Sales", 70000) # New record
], ["id", "name", "department", "salary"])
# Perform upsert merge
delta_table.alias("employees").merge(
updates_df.alias("updates"),
"employees.id = updates.id"
).whenMatchedUpdate(set={
"name": "updates.name",
"department": "updates.department",
"salary": "updates.salary"
}).whenNotMatchedInsert(values={
"id": "updates.id",
"name": "updates.name",
"department": "updates.department",
"salary": "updates.salary"
}).execute()from pyspark.sql.functions import col, lit, current_timestamp
delta_table.alias("target").merge(
source_df.alias("source"),
"target.customer_id = source.customer_id"
).whenMatchedUpdate(
condition=col("source.status") == "active",
set={
"balance": col("source.balance"),
"last_updated": current_timestamp(),
"status": lit("updated")
}
).whenMatchedDelete(
condition=col("source.status") == "deleted"
).whenNotMatchedInsert(
condition=col("source.balance") > 0,
values={
"customer_id": col("source.customer_id"),
"balance": col("source.balance"),
"status": lit("new"),
"created_at": current_timestamp()
}
).whenNotMatchedBySourceUpdate(
set={"status": lit("inactive")}
).execute()# Enable schema evolution to handle new columns
delta_table.merge(
source_with_new_columns_df,
"target.id = source.id"
).withSchemaEvolution().whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()whenMatched clauses: Only the last one can omit the conditionwhenNotMatchedBySource clausesInstall with Tessl CLI
npx tessl i tessl/pypi-delta-spark