0
# Merge Operations
1
2
Advanced upsert functionality for Delta Lake tables supporting complex merge patterns with conditional logic. Enables efficient data synchronization, CDC operations, and schema evolution during merge operations.
3
4
## Capabilities
5
6
### Merge Initialization
7
8
Start a merge operation by specifying source data and join conditions.
9
10
```python { .api }
11
class DeltaTable:
12
def merge(
13
self,
14
source: DataFrame,
15
condition: Union[str, Column]
16
) -> DeltaMergeBuilder:
17
"""
18
Merge data from source DataFrame based on condition.
19
20
Parameters:
21
- source: Source DataFrame to merge
22
- condition: Join condition as SQL string or Column expression
23
24
Returns:
25
DeltaMergeBuilder for configuring merge actions
26
"""
27
```
28
29
```scala { .api }
30
class DeltaTable {
31
def merge(source: DataFrame, condition: String): DeltaMergeBuilder
32
def merge(source: DataFrame, condition: Column): DeltaMergeBuilder
33
}
34
```
35
36
### Matched Row Actions
37
38
Configure actions for rows that match the merge condition.
39
40
```python { .api }
41
class DeltaMergeBuilder:
42
def whenMatchedUpdate(
43
self,
44
condition: Optional[Union[str, Column]] = None,
45
set: Optional[Dict[str, Union[str, Column]]] = None
46
) -> DeltaMergeBuilder:
47
"""
48
Update matched rows with optional additional condition.
49
50
Parameters:
51
- condition: Optional additional condition for update
52
- set: Column mappings for update (required)
53
54
Returns:
55
DeltaMergeBuilder for method chaining
56
"""
57
58
def whenMatchedUpdateAll(
59
self,
60
condition: Optional[Union[str, Column]] = None
61
) -> DeltaMergeBuilder:
62
"""
63
Update all columns of matched rows with source values.
64
65
Parameters:
66
- condition: Optional condition for update
67
68
Returns:
69
DeltaMergeBuilder for method chaining
70
"""
71
72
def whenMatchedDelete(
73
self,
74
condition: Optional[Union[str, Column]] = None
75
) -> DeltaMergeBuilder:
76
"""
77
Delete matched rows with optional condition.
78
79
Parameters:
80
- condition: Optional condition for deletion
81
82
Returns:
83
DeltaMergeBuilder for method chaining
84
"""
85
```
86
87
```scala { .api }
88
class DeltaMergeBuilder {
89
def whenMatchedUpdate(set: Map[String, Column]): DeltaMergeBuilder
90
def whenMatchedUpdate(condition: Column, set: Map[String, Column]): DeltaMergeBuilder
91
def whenMatchedUpdateAll(): DeltaMergeBuilder
92
def whenMatchedUpdateAll(condition: Column): DeltaMergeBuilder
93
def whenMatchedDelete(): DeltaMergeBuilder
94
def whenMatchedDelete(condition: Column): DeltaMergeBuilder
95
}
96
```
97
98
### Unmatched Source Row Actions
99
100
Configure actions for source rows that don't match any target rows.
101
102
```python { .api }
103
class DeltaMergeBuilder:
104
def whenNotMatchedInsert(
105
self,
106
condition: Optional[Union[str, Column]] = None,
107
values: Optional[Dict[str, Union[str, Column]]] = None
108
) -> DeltaMergeBuilder:
109
"""
110
Insert unmatched source rows with optional condition.
111
112
Parameters:
113
- condition: Optional condition for insertion
114
- values: Column mappings for insert (required)
115
116
Returns:
117
DeltaMergeBuilder for method chaining
118
"""
119
120
def whenNotMatchedInsertAll(
121
self,
122
condition: Optional[Union[str, Column]] = None
123
) -> DeltaMergeBuilder:
124
"""
125
Insert all columns from unmatched source rows.
126
127
Parameters:
128
- condition: Optional condition for insertion
129
130
Returns:
131
DeltaMergeBuilder for method chaining
132
"""
133
```
134
135
```scala { .api }
136
class DeltaMergeBuilder {
137
def whenNotMatchedInsert(values: Map[String, Column]): DeltaMergeBuilder
138
def whenNotMatchedInsert(condition: Column, values: Map[String, Column]): DeltaMergeBuilder
139
def whenNotMatchedInsertAll(): DeltaMergeBuilder
140
def whenNotMatchedInsertAll(condition: Column): DeltaMergeBuilder
141
}
142
```
143
144
### Unmatched Target Row Actions
145
146
Configure actions for target rows that don't match any source rows.
147
148
```python { .api }
149
class DeltaMergeBuilder:
150
def whenNotMatchedBySourceUpdate(
151
self,
152
condition: Optional[Union[str, Column]] = None,
153
set: Optional[Dict[str, Union[str, Column]]] = None
154
) -> DeltaMergeBuilder:
155
"""
156
Update target rows not matched by source.
157
158
Parameters:
159
- condition: Optional condition for update
160
- set: Column mappings for update (required)
161
162
Returns:
163
DeltaMergeBuilder for method chaining
164
"""
165
166
def whenNotMatchedBySourceDelete(
167
self,
168
condition: Optional[Union[str, Column]] = None
169
) -> DeltaMergeBuilder:
170
"""
171
Delete target rows not matched by source.
172
173
Parameters:
174
- condition: Optional condition for deletion
175
176
Returns:
177
DeltaMergeBuilder for method chaining
178
"""
179
```
180
181
```scala { .api }
182
class DeltaMergeBuilder {
183
def whenNotMatchedBySourceUpdate(set: Map[String, Column]): DeltaMergeBuilder
184
def whenNotMatchedBySourceUpdate(condition: Column, set: Map[String, Column]): DeltaMergeBuilder
185
def whenNotMatchedBySourceDelete(): DeltaMergeBuilder
186
def whenNotMatchedBySourceDelete(condition: Column): DeltaMergeBuilder
187
}
188
```
189
190
### Schema Evolution
191
192
Enable automatic schema evolution during merge operations.
193
194
```python { .api }
195
class DeltaMergeBuilder:
196
def withSchemaEvolution(self) -> DeltaMergeBuilder:
197
"""
198
Enable schema evolution for merge operation.
199
200
Returns:
201
DeltaMergeBuilder with schema evolution enabled
202
"""
203
```
204
205
```scala { .api }
206
class DeltaMergeBuilder {
207
def withSchemaEvolution(): DeltaMergeBuilder
208
}
209
```
210
211
### Merge Execution
212
213
Execute the configured merge operation.
214
215
```python { .api }
216
class DeltaMergeBuilder:
217
def execute(self) -> DataFrame:
218
"""
219
Execute the merge operation.
220
221
Returns:
222
DataFrame with merge operation metrics
223
"""
224
```
225
226
```scala { .api }
227
class DeltaMergeBuilder {
228
def execute(): DataFrame
229
}
230
```
231
232
## Usage Examples
233
234
### Basic Upsert Pattern
235
236
```python
237
# Source data with updates and new records
238
updates_df = spark.createDataFrame([
239
(1, "Alice", "Engineering", 95000),
240
(2, "Bob", "Marketing", 75000),
241
(5, "Eve", "Sales", 70000) # New record
242
], ["id", "name", "department", "salary"])
243
244
# Perform upsert merge
245
delta_table.alias("employees").merge(
246
updates_df.alias("updates"),
247
"employees.id = updates.id"
248
).whenMatchedUpdate(set={
249
"name": "updates.name",
250
"department": "updates.department",
251
"salary": "updates.salary"
252
}).whenNotMatchedInsert(values={
253
"id": "updates.id",
254
"name": "updates.name",
255
"department": "updates.department",
256
"salary": "updates.salary"
257
}).execute()
258
```
259
260
### Complex Merge with Multiple Conditions
261
262
```python
263
from pyspark.sql.functions import col, lit, current_timestamp
264
265
delta_table.alias("target").merge(
266
source_df.alias("source"),
267
"target.customer_id = source.customer_id"
268
).whenMatchedUpdate(
269
condition=col("source.status") == "active",
270
set={
271
"balance": col("source.balance"),
272
"last_updated": current_timestamp(),
273
"status": lit("updated")
274
}
275
).whenMatchedDelete(
276
condition=col("source.status") == "deleted"
277
).whenNotMatchedInsert(
278
condition=col("source.balance") > 0,
279
values={
280
"customer_id": col("source.customer_id"),
281
"balance": col("source.balance"),
282
"status": lit("new"),
283
"created_at": current_timestamp()
284
}
285
).whenNotMatchedBySourceUpdate(
286
set={"status": lit("inactive")}
287
).execute()
288
```
289
290
### Schema Evolution Example
291
292
```python
293
# Enable schema evolution to handle new columns
294
delta_table.merge(
295
source_with_new_columns_df,
296
"target.id = source.id"
297
).withSchemaEvolution().whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
298
```
299
300
## Merge Constraints
301
302
- Multiple `whenMatched` clauses: Only the last one can omit the condition
303
- Order matters: First matching condition's action is executed
304
- Source expressions: Can reference both source and target columns in matched clauses
305
- Target expressions: Only target columns in `whenNotMatchedBySource` clauses
306
- Schema evolution: Available for insert and update operations