CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-delta-spark

Python APIs for using Delta Lake with Apache Spark

Pending
Overview
Eval results
Files

merge-operations.mddocs/

Merge Operations

Advanced upsert functionality for Delta Lake tables supporting complex merge patterns with conditional logic. Enables efficient data synchronization, CDC operations, and schema evolution during merge operations.

Capabilities

Merge Initialization

Start a merge operation by specifying source data and join conditions.

class DeltaTable:
    def merge(
        self,
        source: DataFrame,
        condition: Union[str, Column]
    ) -> DeltaMergeBuilder:
        """
        Merge data from source DataFrame based on condition.
        
        Parameters:
        - source: Source DataFrame to merge
        - condition: Join condition as SQL string or Column expression
        
        Returns:
        DeltaMergeBuilder for configuring merge actions
        """
class DeltaTable {
  def merge(source: DataFrame, condition: String): DeltaMergeBuilder
  def merge(source: DataFrame, condition: Column): DeltaMergeBuilder
}

Matched Row Actions

Configure actions for rows that match the merge condition.

class DeltaMergeBuilder:
    def whenMatchedUpdate(
        self,
        condition: Optional[Union[str, Column]] = None,
        set: Optional[Dict[str, Union[str, Column]]] = None
    ) -> DeltaMergeBuilder:
        """
        Update matched rows with optional additional condition.
        
        Parameters:
        - condition: Optional additional condition for update
        - set: Column mappings for update (required)
        
        Returns:
        DeltaMergeBuilder for method chaining
        """
    
    def whenMatchedUpdateAll(
        self,
        condition: Optional[Union[str, Column]] = None
    ) -> DeltaMergeBuilder:
        """
        Update all columns of matched rows with source values.
        
        Parameters:
        - condition: Optional condition for update
        
        Returns:
        DeltaMergeBuilder for method chaining
        """
    
    def whenMatchedDelete(
        self,
        condition: Optional[Union[str, Column]] = None
    ) -> DeltaMergeBuilder:
        """
        Delete matched rows with optional condition.
        
        Parameters:
        - condition: Optional condition for deletion
        
        Returns:
        DeltaMergeBuilder for method chaining
        """
class DeltaMergeBuilder {
  def whenMatchedUpdate(set: Map[String, Column]): DeltaMergeBuilder
  def whenMatchedUpdate(condition: Column, set: Map[String, Column]): DeltaMergeBuilder
  def whenMatchedUpdateAll(): DeltaMergeBuilder
  def whenMatchedUpdateAll(condition: Column): DeltaMergeBuilder
  def whenMatchedDelete(): DeltaMergeBuilder
  def whenMatchedDelete(condition: Column): DeltaMergeBuilder
}

Unmatched Source Row Actions

Configure actions for source rows that don't match any target rows.

class DeltaMergeBuilder:
    def whenNotMatchedInsert(
        self,
        condition: Optional[Union[str, Column]] = None,
        values: Optional[Dict[str, Union[str, Column]]] = None
    ) -> DeltaMergeBuilder:
        """
        Insert unmatched source rows with optional condition.
        
        Parameters:
        - condition: Optional condition for insertion
        - values: Column mappings for insert (required)
        
        Returns:
        DeltaMergeBuilder for method chaining
        """
    
    def whenNotMatchedInsertAll(
        self,
        condition: Optional[Union[str, Column]] = None
    ) -> DeltaMergeBuilder:
        """
        Insert all columns from unmatched source rows.
        
        Parameters:
        - condition: Optional condition for insertion
        
        Returns:
        DeltaMergeBuilder for method chaining
        """
class DeltaMergeBuilder {
  def whenNotMatchedInsert(values: Map[String, Column]): DeltaMergeBuilder
  def whenNotMatchedInsert(condition: Column, values: Map[String, Column]): DeltaMergeBuilder
  def whenNotMatchedInsertAll(): DeltaMergeBuilder
  def whenNotMatchedInsertAll(condition: Column): DeltaMergeBuilder
}

Unmatched Target Row Actions

Configure actions for target rows that don't match any source rows.

class DeltaMergeBuilder:
    def whenNotMatchedBySourceUpdate(
        self,
        condition: Optional[Union[str, Column]] = None,
        set: Optional[Dict[str, Union[str, Column]]] = None
    ) -> DeltaMergeBuilder:
        """
        Update target rows not matched by source.
        
        Parameters:
        - condition: Optional condition for update
        - set: Column mappings for update (required)
        
        Returns:
        DeltaMergeBuilder for method chaining
        """
    
    def whenNotMatchedBySourceDelete(
        self,
        condition: Optional[Union[str, Column]] = None
    ) -> DeltaMergeBuilder:
        """
        Delete target rows not matched by source.
        
        Parameters:
        - condition: Optional condition for deletion
        
        Returns:
        DeltaMergeBuilder for method chaining
        """
class DeltaMergeBuilder {
  def whenNotMatchedBySourceUpdate(set: Map[String, Column]): DeltaMergeBuilder
  def whenNotMatchedBySourceUpdate(condition: Column, set: Map[String, Column]): DeltaMergeBuilder
  def whenNotMatchedBySourceDelete(): DeltaMergeBuilder
  def whenNotMatchedBySourceDelete(condition: Column): DeltaMergeBuilder
}

Schema Evolution

Enable automatic schema evolution during merge operations.

class DeltaMergeBuilder:
    def withSchemaEvolution(self) -> DeltaMergeBuilder:
        """
        Enable schema evolution for merge operation.
        
        Returns:
        DeltaMergeBuilder with schema evolution enabled
        """
class DeltaMergeBuilder {
  def withSchemaEvolution(): DeltaMergeBuilder
}

Merge Execution

Execute the configured merge operation.

class DeltaMergeBuilder:
    def execute(self) -> DataFrame:
        """
        Execute the merge operation.
        
        Returns:
        DataFrame with merge operation metrics
        """
class DeltaMergeBuilder {
  def execute(): DataFrame
}

Usage Examples

Basic Upsert Pattern

# Source data with updates and new records
updates_df = spark.createDataFrame([
    (1, "Alice", "Engineering", 95000),
    (2, "Bob", "Marketing", 75000),
    (5, "Eve", "Sales", 70000)  # New record
], ["id", "name", "department", "salary"])

# Perform upsert merge
delta_table.alias("employees").merge(
    updates_df.alias("updates"),
    "employees.id = updates.id"
).whenMatchedUpdate(set={
    "name": "updates.name",
    "department": "updates.department", 
    "salary": "updates.salary"
}).whenNotMatchedInsert(values={
    "id": "updates.id",
    "name": "updates.name",
    "department": "updates.department",
    "salary": "updates.salary"
}).execute()

Complex Merge with Multiple Conditions

from pyspark.sql.functions import col, lit, current_timestamp

delta_table.alias("target").merge(
    source_df.alias("source"),
    "target.customer_id = source.customer_id"
).whenMatchedUpdate(
    condition=col("source.status") == "active",
    set={
        "balance": col("source.balance"),
        "last_updated": current_timestamp(),
        "status": lit("updated")
    }
).whenMatchedDelete(
    condition=col("source.status") == "deleted"
).whenNotMatchedInsert(
    condition=col("source.balance") > 0,
    values={
        "customer_id": col("source.customer_id"),
        "balance": col("source.balance"),
        "status": lit("new"),
        "created_at": current_timestamp()
    }
).whenNotMatchedBySourceUpdate(
    set={"status": lit("inactive")}
).execute()

Schema Evolution Example

# Enable schema evolution to handle new columns
delta_table.merge(
    source_with_new_columns_df,
    "target.id = source.id"
).withSchemaEvolution().whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

Merge Constraints

  • Multiple whenMatched clauses: Only the last one can omit the condition
  • Order matters: First matching condition's action is executed
  • Source expressions: Can reference both source and target columns in matched clauses
  • Target expressions: Only target columns in whenNotMatchedBySource clauses
  • Schema evolution: Available for insert and update operations

Install with Tessl CLI

npx tessl i tessl/pypi-delta-spark

docs

configuration.md

index.md

merge-operations.md

optimization.md

table-management.md

table-operations.md

time-travel.md

tile.json