or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

data-reading.mdindex.mdquery-operations.mdschema-management.mdtable-maintenance.mdtable-operations.mdtransaction-management.mdwriting-modification.md

writing-modification.mddocs/

0

# Writing and Data Modification

1

2

Functions for writing data to Delta tables and modifying existing records through update, delete, and merge operations. Provides ACID transaction guarantees and supports various data sources.

3

4

## Capabilities

5

6

### Writing Data

7

8

```python { .api }

9

def write_deltalake(

10

table_or_uri: str | Path | DeltaTable,

11

data: ArrowStreamExportable | ArrowArrayExportable | Sequence[ArrowArrayExportable],

12

*,

13

partition_by: list[str] | str | None = None,

14

mode: Literal["error", "append", "overwrite", "ignore"] = "error",

15

name: str | None = None,

16

description: str | None = None,

17

configuration: Mapping[str, str | None] | None = None,

18

schema_mode: Literal["merge", "overwrite"] | None = None,

19

storage_options: dict[str, str] | None = None,

20

target_file_size: int | None = None,

21

writer_properties: WriterProperties | None = None,

22

post_commithook_properties: PostCommitHookProperties | None = None,

23

commit_properties: CommitProperties | None = None,

24

) -> None: ...

25

```

26

27

Write data to a Delta table with comprehensive configuration options.

28

29

**Parameters:**

30

- `table_or_uri`: Target table path or existing DeltaTable instance

31

- `data`: Data to write (pandas DataFrame, PyArrow Table, RecordBatch, etc.)

32

- `partition_by`: Column names for partitioning

33

- `mode`: Write mode behavior

34

- `schema_mode`: How to handle schema differences

35

- `storage_options`: Backend-specific configuration

36

- `writer_properties`: Parquet writer configuration

37

- `commit_properties`: Transaction commit settings

38

- `post_commithook_properties`: Post-commit operations

39

40

### Converting Existing Tables

41

42

```python { .api }

43

def convert_to_deltalake(

44

uri: str | Path,

45

mode: Literal["error", "ignore"] = "error",

46

partition_by: Schema | None = None,

47

partition_strategy: Literal["hive"] | None = None,

48

name: str | None = None,

49

description: str | None = None,

50

configuration: Mapping[str, str | None] | None = None,

51

storage_options: dict[str, str] | None = None,

52

commit_properties: CommitProperties | None = None,

53

post_commithook_properties: PostCommitHookProperties | None = None,

54

) -> None: ...

55

```

56

57

Convert an existing Parquet table to Delta Lake format.

58

59

### Update Operations

60

61

```python { .api }

62

def update(

63

self,

64

updates: dict[str, str] | None = None,

65

new_values: dict[str, int | float | str | datetime | bool | list[Any]] | None = None,

66

predicate: str | None = None,

67

writer_properties: WriterProperties | None = None,

68

error_on_type_mismatch: bool = True,

69

post_commithook_properties: PostCommitHookProperties | None = None,

70

commit_properties: CommitProperties | None = None,

71

) -> dict[str, Any]: ...

72

```

73

74

Update records in the Delta table matching an optional predicate.

75

76

**Parameters:**

77

- `updates`: Column name to SQL expression mapping

78

- `new_values`: Column name to Python value mapping

79

- `predicate`: SQL WHERE clause for filtering rows to update

80

- `writer_properties`: Parquet writer configuration

81

- `error_on_type_mismatch`: Raise error on type conflicts

82

83

### Delete Operations

84

85

```python { .api }

86

def delete(

87

self,

88

predicate: str | None = None,

89

writer_properties: WriterProperties | None = None,

90

post_commithook_properties: PostCommitHookProperties | None = None,

91

commit_properties: CommitProperties | None = None,

92

) -> dict[str, Any]: ...

93

```

94

95

Delete records matching the specified predicate.

96

97

### Merge Operations

98

99

```python { .api }

100

def merge(

101

self,

102

source: pyarrow.Table | pyarrow.RecordBatch | pyarrow.dataset.Dataset,

103

predicate: str,

104

source_alias: str | None = None,

105

target_alias: str | None = None,

106

writer_properties: WriterProperties | None = None,

107

post_commithook_properties: PostCommitHookProperties | None = None,

108

commit_properties: CommitProperties | None = None,

109

) -> TableMerger: ...

110

```

111

112

Start a merge operation (UPSERT) with the specified source data.

113

114

### TableMerger Class

115

116

```python { .api }

117

class TableMerger:

118

def when_matched_update(

119

self,

120

updates: dict[str, str] | None = None,

121

predicate: str | None = None,

122

) -> TableMerger: ...

123

124

def when_matched_update_all(self, predicate: str | None = None) -> TableMerger: ...

125

126

def when_matched_delete(self, predicate: str | None = None) -> TableMerger: ...

127

128

def when_not_matched_insert(

129

self,

130

updates: dict[str, str] | None = None,

131

predicate: str | None = None,

132

) -> TableMerger: ...

133

134

def when_not_matched_insert_all(self, predicate: str | None = None) -> TableMerger: ...

135

136

def execute(self) -> dict[str, Any]: ...

137

```

138

139

Builder pattern for constructing complex merge operations.

140

141

## Usage Examples

142

143

### Basic Writing Operations

144

145

```python

146

from deltalake import write_deltalake, DeltaTable

147

import pandas as pd

148

149

# Create sample data

150

data = pd.DataFrame({

151

'id': [1, 2, 3, 4, 5],

152

'name': ['Alice', 'Bob', 'Charlie', 'David', 'Eve'],

153

'age': [25, 30, 35, 28, 32],

154

'department': ['Engineering', 'Sales', 'Engineering', 'Sales', 'Marketing']

155

})

156

157

# Write to new table

158

write_deltalake("path/to/new-table", data, mode="error")

159

160

# Append more data

161

new_data = pd.DataFrame({

162

'id': [6, 7],

163

'name': ['Frank', 'Grace'],

164

'age': [29, 27],

165

'department': ['Engineering', 'Marketing']

166

})

167

168

write_deltalake("path/to/new-table", new_data, mode="append")

169

170

# Overwrite entire table

171

write_deltalake("path/to/new-table", data, mode="overwrite")

172

```

173

174

### Partitioned Writing

175

176

```python

177

# Write with partitioning

178

write_deltalake(

179

"path/to/partitioned-table",

180

data,

181

partition_by=["department"],

182

mode="error"

183

)

184

185

# Write with multiple partition columns

186

sales_data = pd.DataFrame({

187

'id': range(100),

188

'sale_date': pd.date_range('2023-01-01', periods=100),

189

'amount': np.random.randint(100, 1000, 100),

190

'region': np.random.choice(['US', 'EU', 'ASIA'], 100)

191

})

192

193

write_deltalake(

194

"path/to/sales-table",

195

sales_data,

196

partition_by=["region", "sale_date"],

197

mode="error"

198

)

199

```

200

201

### Update Operations

202

203

```python

204

dt = DeltaTable("path/to/table")

205

206

# Update with new values

207

result = dt.update(

208

predicate="age < 30",

209

new_values={"department": "Junior Engineering"}

210

)

211

print(f"Updated {result['num_updated_rows']} rows")

212

213

# Update with SQL expressions

214

dt.update(

215

predicate="department = 'Sales'",

216

updates={"age": "age + 1"} # Increment age by 1

217

)

218

219

# Conditional updates

220

dt.update(

221

predicate="name = 'Alice'",

222

new_values={

223

"age": 26,

224

"department": "Senior Engineering"

225

}

226

)

227

```

228

229

### Delete Operations

230

231

```python

232

# Delete specific records

233

result = dt.delete(predicate="age > 35")

234

print(f"Deleted {result['num_deleted_rows']} rows")

235

236

# Delete all records from a department

237

dt.delete(predicate="department = 'Marketing'")

238

239

# Delete without predicate (truncate)

240

dt.delete() # Deletes all rows

241

```

242

243

### Merge Operations (UPSERT)

244

245

```python

246

import pyarrow as pa

247

248

# Source data for merge

249

source_data = pa.table({

250

'id': [2, 3, 6],

251

'name': ['Bob Updated', 'Charlie Updated', 'New Person'],

252

'age': [31, 36, 40],

253

'department': ['Sales', 'Engineering', 'HR']

254

})

255

256

# Perform merge operation

257

merge_result = (

258

dt.merge(

259

source=source_data,

260

predicate="target.id = source.id",

261

source_alias="source",

262

target_alias="target"

263

)

264

.when_matched_update_all() # Update all columns when matched

265

.when_not_matched_insert_all() # Insert when not matched

266

.execute()

267

)

268

269

print(f"Merge completed:")

270

print(f"- Rows updated: {merge_result['num_source_rows']}")

271

print(f"- Rows inserted: {merge_result['num_target_rows_inserted']}")

272

```

273

274

### Advanced Merge with Conditions

275

276

```python

277

# Complex merge with conditions

278

merge_result = (

279

dt.merge(

280

source=source_data,

281

predicate="target.id = source.id"

282

)

283

.when_matched_update(

284

updates={

285

"name": "source.name",

286

"age": "source.age"

287

},

288

predicate="target.age < source.age" # Only update if source age is higher

289

)

290

.when_matched_delete(

291

predicate="source.department = 'DELETED'" # Delete if marked for deletion

292

)

293

.when_not_matched_insert(

294

updates={

295

"id": "source.id",

296

"name": "source.name",

297

"age": "source.age",

298

"department": "source.department"

299

},

300

predicate="source.age >= 18" # Only insert adults

301

)

302

.execute()

303

)

304

```

305

306

### Converting Existing Tables

307

308

```python

309

# Convert Parquet table to Delta

310

convert_to_deltalake(

311

"path/to/parquet-table",

312

partition_by=["year", "month"],

313

name="My Converted Table",

314

description="Converted from Parquet format"

315

)

316

317

# Convert with configuration

318

convert_to_deltalake(

319

"s3://bucket/parquet-data",

320

storage_options={

321

"AWS_REGION": "us-west-2",

322

"AWS_ACCESS_KEY_ID": "key",

323

"AWS_SECRET_ACCESS_KEY": "secret"

324

},

325

target_file_size=128 * 1024 * 1024 # 128MB target file size

326

)

327

```

328

329

### Write Configuration

330

331

```python

332

from deltalake.writer import WriterProperties, Compression

333

334

# Configure writer properties

335

writer_props = WriterProperties(

336

compression=Compression.SNAPPY,

337

max_row_group_size=10000,

338

write_batch_size=1000

339

)

340

341

# Write with custom properties

342

write_deltalake(

343

"path/to/table",

344

data,

345

writer_properties=writer_props,

346

target_file_size=64 * 1024 * 1024, # 64MB files

347

configuration={

348

"delta.autoOptimize.optimizeWrite": "true",

349

"delta.autoOptimize.autoCompact": "true"

350

}

351

)

352

```

353

354

## Writer Configuration Classes

355

356

### BloomFilterProperties

357

358

```python { .api }

359

@dataclass

360

class BloomFilterProperties:

361

def __init__(

362

self,

363

set_bloom_filter_enabled: bool | None,

364

fpp: float | None = None,

365

ndv: int | None = None,

366

) -> None: ...

367

368

set_bloom_filter_enabled: bool | None

369

fpp: float | None

370

ndv: int | None

371

```

372

373

Configure bloom filter properties for parquet writer optimization. Bloom filters help skip data files during reads by providing probabilistic membership testing.

374

375

**Parameters:**

376

- `set_bloom_filter_enabled`: Enable bloom filter with default values if no fpp/ndv provided

377

- `fpp`: False positive probability (between 0 and 1 exclusive)

378

- `ndv`: Number of distinct values for the bloom filter

379

380

### ColumnProperties

381

382

```python { .api }

383

@dataclass

384

class ColumnProperties:

385

def __init__(

386

self,

387

dictionary_enabled: bool | None = None,

388

statistics_enabled: Literal["NONE", "CHUNK", "PAGE"] | None = None,

389

bloom_filter_properties: BloomFilterProperties | None = None,

390

) -> None: ...

391

392

dictionary_enabled: bool | None

393

statistics_enabled: Literal["NONE", "CHUNK", "PAGE"] | None

394

bloom_filter_properties: BloomFilterProperties | None

395

```

396

397

Configure per-column properties for parquet writing, including dictionary encoding, statistics collection levels, and bloom filter settings.

398

399

**Parameters:**

400

- `dictionary_enabled`: Enable dictionary encoding for the column

401

- `statistics_enabled`: Statistics level for the column

402

- `bloom_filter_properties`: Bloom filter configuration for the column