or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdindex.mdmerge-operations.mdoptimization.mdtable-management.mdtable-operations.mdtime-travel.md

merge-operations.mddocs/

0

# Merge Operations

1

2

Advanced upsert functionality for Delta Lake tables supporting complex merge patterns with conditional logic. Enables efficient data synchronization, CDC operations, and schema evolution during merge operations.

3

4

## Capabilities

5

6

### Merge Initialization

7

8

Start a merge operation by specifying source data and join conditions.

9

10

```python { .api }

11

class DeltaTable:

12

def merge(

13

self,

14

source: DataFrame,

15

condition: Union[str, Column]

16

) -> DeltaMergeBuilder:

17

"""

18

Merge data from source DataFrame based on condition.

19

20

Parameters:

21

- source: Source DataFrame to merge

22

- condition: Join condition as SQL string or Column expression

23

24

Returns:

25

DeltaMergeBuilder for configuring merge actions

26

"""

27

```

28

29

```scala { .api }

30

class DeltaTable {

31

def merge(source: DataFrame, condition: String): DeltaMergeBuilder

32

def merge(source: DataFrame, condition: Column): DeltaMergeBuilder

33

}

34

```

35

36

### Matched Row Actions

37

38

Configure actions for rows that match the merge condition.

39

40

```python { .api }

41

class DeltaMergeBuilder:

42

def whenMatchedUpdate(

43

self,

44

condition: Optional[Union[str, Column]] = None,

45

set: Optional[Dict[str, Union[str, Column]]] = None

46

) -> DeltaMergeBuilder:

47

"""

48

Update matched rows with optional additional condition.

49

50

Parameters:

51

- condition: Optional additional condition for update

52

- set: Column mappings for update (required)

53

54

Returns:

55

DeltaMergeBuilder for method chaining

56

"""

57

58

def whenMatchedUpdateAll(

59

self,

60

condition: Optional[Union[str, Column]] = None

61

) -> DeltaMergeBuilder:

62

"""

63

Update all columns of matched rows with source values.

64

65

Parameters:

66

- condition: Optional condition for update

67

68

Returns:

69

DeltaMergeBuilder for method chaining

70

"""

71

72

def whenMatchedDelete(

73

self,

74

condition: Optional[Union[str, Column]] = None

75

) -> DeltaMergeBuilder:

76

"""

77

Delete matched rows with optional condition.

78

79

Parameters:

80

- condition: Optional condition for deletion

81

82

Returns:

83

DeltaMergeBuilder for method chaining

84

"""

85

```

86

87

```scala { .api }

88

class DeltaMergeBuilder {

89

def whenMatchedUpdate(set: Map[String, Column]): DeltaMergeBuilder

90

def whenMatchedUpdate(condition: Column, set: Map[String, Column]): DeltaMergeBuilder

91

def whenMatchedUpdateAll(): DeltaMergeBuilder

92

def whenMatchedUpdateAll(condition: Column): DeltaMergeBuilder

93

def whenMatchedDelete(): DeltaMergeBuilder

94

def whenMatchedDelete(condition: Column): DeltaMergeBuilder

95

}

96

```

97

98

### Unmatched Source Row Actions

99

100

Configure actions for source rows that don't match any target rows.

101

102

```python { .api }

103

class DeltaMergeBuilder:

104

def whenNotMatchedInsert(

105

self,

106

condition: Optional[Union[str, Column]] = None,

107

values: Optional[Dict[str, Union[str, Column]]] = None

108

) -> DeltaMergeBuilder:

109

"""

110

Insert unmatched source rows with optional condition.

111

112

Parameters:

113

- condition: Optional condition for insertion

114

- values: Column mappings for insert (required)

115

116

Returns:

117

DeltaMergeBuilder for method chaining

118

"""

119

120

def whenNotMatchedInsertAll(

121

self,

122

condition: Optional[Union[str, Column]] = None

123

) -> DeltaMergeBuilder:

124

"""

125

Insert all columns from unmatched source rows.

126

127

Parameters:

128

- condition: Optional condition for insertion

129

130

Returns:

131

DeltaMergeBuilder for method chaining

132

"""

133

```

134

135

```scala { .api }

136

class DeltaMergeBuilder {

137

def whenNotMatchedInsert(values: Map[String, Column]): DeltaMergeBuilder

138

def whenNotMatchedInsert(condition: Column, values: Map[String, Column]): DeltaMergeBuilder

139

def whenNotMatchedInsertAll(): DeltaMergeBuilder

140

def whenNotMatchedInsertAll(condition: Column): DeltaMergeBuilder

141

}

142

```

143

144

### Unmatched Target Row Actions

145

146

Configure actions for target rows that don't match any source rows.

147

148

```python { .api }

149

class DeltaMergeBuilder:

150

def whenNotMatchedBySourceUpdate(

151

self,

152

condition: Optional[Union[str, Column]] = None,

153

set: Optional[Dict[str, Union[str, Column]]] = None

154

) -> DeltaMergeBuilder:

155

"""

156

Update target rows not matched by source.

157

158

Parameters:

159

- condition: Optional condition for update

160

- set: Column mappings for update (required)

161

162

Returns:

163

DeltaMergeBuilder for method chaining

164

"""

165

166

def whenNotMatchedBySourceDelete(

167

self,

168

condition: Optional[Union[str, Column]] = None

169

) -> DeltaMergeBuilder:

170

"""

171

Delete target rows not matched by source.

172

173

Parameters:

174

- condition: Optional condition for deletion

175

176

Returns:

177

DeltaMergeBuilder for method chaining

178

"""

179

```

180

181

```scala { .api }

182

class DeltaMergeBuilder {

183

def whenNotMatchedBySourceUpdate(set: Map[String, Column]): DeltaMergeBuilder

184

def whenNotMatchedBySourceUpdate(condition: Column, set: Map[String, Column]): DeltaMergeBuilder

185

def whenNotMatchedBySourceDelete(): DeltaMergeBuilder

186

def whenNotMatchedBySourceDelete(condition: Column): DeltaMergeBuilder

187

}

188

```

189

190

### Schema Evolution

191

192

Enable automatic schema evolution during merge operations.

193

194

```python { .api }

195

class DeltaMergeBuilder:

196

def withSchemaEvolution(self) -> DeltaMergeBuilder:

197

"""

198

Enable schema evolution for merge operation.

199

200

Returns:

201

DeltaMergeBuilder with schema evolution enabled

202

"""

203

```

204

205

```scala { .api }

206

class DeltaMergeBuilder {

207

def withSchemaEvolution(): DeltaMergeBuilder

208

}

209

```

210

211

### Merge Execution

212

213

Execute the configured merge operation.

214

215

```python { .api }

216

class DeltaMergeBuilder:

217

def execute(self) -> DataFrame:

218

"""

219

Execute the merge operation.

220

221

Returns:

222

DataFrame with merge operation metrics

223

"""

224

```

225

226

```scala { .api }

227

class DeltaMergeBuilder {

228

def execute(): DataFrame

229

}

230

```

231

232

## Usage Examples

233

234

### Basic Upsert Pattern

235

236

```python

237

# Source data with updates and new records

238

updates_df = spark.createDataFrame([

239

(1, "Alice", "Engineering", 95000),

240

(2, "Bob", "Marketing", 75000),

241

(5, "Eve", "Sales", 70000) # New record

242

], ["id", "name", "department", "salary"])

243

244

# Perform upsert merge

245

delta_table.alias("employees").merge(

246

updates_df.alias("updates"),

247

"employees.id = updates.id"

248

).whenMatchedUpdate(set={

249

"name": "updates.name",

250

"department": "updates.department",

251

"salary": "updates.salary"

252

}).whenNotMatchedInsert(values={

253

"id": "updates.id",

254

"name": "updates.name",

255

"department": "updates.department",

256

"salary": "updates.salary"

257

}).execute()

258

```

259

260

### Complex Merge with Multiple Conditions

261

262

```python

263

from pyspark.sql.functions import col, lit, current_timestamp

264

265

delta_table.alias("target").merge(

266

source_df.alias("source"),

267

"target.customer_id = source.customer_id"

268

).whenMatchedUpdate(

269

condition=col("source.status") == "active",

270

set={

271

"balance": col("source.balance"),

272

"last_updated": current_timestamp(),

273

"status": lit("updated")

274

}

275

).whenMatchedDelete(

276

condition=col("source.status") == "deleted"

277

).whenNotMatchedInsert(

278

condition=col("source.balance") > 0,

279

values={

280

"customer_id": col("source.customer_id"),

281

"balance": col("source.balance"),

282

"status": lit("new"),

283

"created_at": current_timestamp()

284

}

285

).whenNotMatchedBySourceUpdate(

286

set={"status": lit("inactive")}

287

).execute()

288

```

289

290

### Schema Evolution Example

291

292

```python

293

# Enable schema evolution to handle new columns

294

delta_table.merge(

295

source_with_new_columns_df,

296

"target.id = source.id"

297

).withSchemaEvolution().whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

298

```

299

300

## Merge Constraints

301

302

- Multiple `whenMatched` clauses: Only the last one can omit the condition

303

- Order matters: First matching condition's action is executed

304

- Source expressions: Can reference both source and target columns in matched clauses

305

- Target expressions: Only target columns in `whenNotMatchedBySource` clauses

306

- Schema evolution: Available for insert and update operations