0
# Writing and Data Modification
1
2
Functions for writing data to Delta tables and modifying existing records through update, delete, and merge operations. Provides ACID transaction guarantees and supports various data sources.
3
4
## Capabilities
5
6
### Writing Data
7
8
```python { .api }
9
def write_deltalake(
10
table_or_uri: str | Path | DeltaTable,
11
data: ArrowStreamExportable | ArrowArrayExportable | Sequence[ArrowArrayExportable],
12
*,
13
partition_by: list[str] | str | None = None,
14
mode: Literal["error", "append", "overwrite", "ignore"] = "error",
15
name: str | None = None,
16
description: str | None = None,
17
configuration: Mapping[str, str | None] | None = None,
18
schema_mode: Literal["merge", "overwrite"] | None = None,
19
storage_options: dict[str, str] | None = None,
20
target_file_size: int | None = None,
21
writer_properties: WriterProperties | None = None,
22
post_commithook_properties: PostCommitHookProperties | None = None,
23
commit_properties: CommitProperties | None = None,
24
) -> None: ...
25
```
26
27
Write data to a Delta table with comprehensive configuration options.
28
29
**Parameters:**
30
- `table_or_uri`: Target table path or existing DeltaTable instance
31
- `data`: Data to write (pandas DataFrame, PyArrow Table, RecordBatch, etc.)
32
- `partition_by`: Column names for partitioning
33
- `mode`: Write mode behavior
34
- `schema_mode`: How to handle schema differences
35
- `storage_options`: Backend-specific configuration
36
- `writer_properties`: Parquet writer configuration
37
- `commit_properties`: Transaction commit settings
38
- `post_commithook_properties`: Post-commit operations
39
40
### Converting Existing Tables
41
42
```python { .api }
43
def convert_to_deltalake(
44
uri: str | Path,
45
mode: Literal["error", "ignore"] = "error",
46
partition_by: Schema | None = None,
47
partition_strategy: Literal["hive"] | None = None,
48
name: str | None = None,
49
description: str | None = None,
50
configuration: Mapping[str, str | None] | None = None,
51
storage_options: dict[str, str] | None = None,
52
commit_properties: CommitProperties | None = None,
53
post_commithook_properties: PostCommitHookProperties | None = None,
54
) -> None: ...
55
```
56
57
Convert an existing Parquet table to Delta Lake format.
58
59
### Update Operations
60
61
```python { .api }
62
def update(
63
self,
64
updates: dict[str, str] | None = None,
65
new_values: dict[str, int | float | str | datetime | bool | list[Any]] | None = None,
66
predicate: str | None = None,
67
writer_properties: WriterProperties | None = None,
68
error_on_type_mismatch: bool = True,
69
post_commithook_properties: PostCommitHookProperties | None = None,
70
commit_properties: CommitProperties | None = None,
71
) -> dict[str, Any]: ...
72
```
73
74
Update records in the Delta table matching an optional predicate.
75
76
**Parameters:**
77
- `updates`: Column name to SQL expression mapping
78
- `new_values`: Column name to Python value mapping
79
- `predicate`: SQL WHERE clause for filtering rows to update
80
- `writer_properties`: Parquet writer configuration
81
- `error_on_type_mismatch`: Raise error on type conflicts
82
83
### Delete Operations
84
85
```python { .api }
86
def delete(
87
self,
88
predicate: str | None = None,
89
writer_properties: WriterProperties | None = None,
90
post_commithook_properties: PostCommitHookProperties | None = None,
91
commit_properties: CommitProperties | None = None,
92
) -> dict[str, Any]: ...
93
```
94
95
Delete records matching the specified predicate.
96
97
### Merge Operations
98
99
```python { .api }
100
def merge(
101
self,
102
source: pyarrow.Table | pyarrow.RecordBatch | pyarrow.dataset.Dataset,
103
predicate: str,
104
source_alias: str | None = None,
105
target_alias: str | None = None,
106
writer_properties: WriterProperties | None = None,
107
post_commithook_properties: PostCommitHookProperties | None = None,
108
commit_properties: CommitProperties | None = None,
109
) -> TableMerger: ...
110
```
111
112
Start a merge operation (UPSERT) with the specified source data.
113
114
### TableMerger Class
115
116
```python { .api }
117
class TableMerger:
118
def when_matched_update(
119
self,
120
updates: dict[str, str] | None = None,
121
predicate: str | None = None,
122
) -> TableMerger: ...
123
124
def when_matched_update_all(self, predicate: str | None = None) -> TableMerger: ...
125
126
def when_matched_delete(self, predicate: str | None = None) -> TableMerger: ...
127
128
def when_not_matched_insert(
129
self,
130
updates: dict[str, str] | None = None,
131
predicate: str | None = None,
132
) -> TableMerger: ...
133
134
def when_not_matched_insert_all(self, predicate: str | None = None) -> TableMerger: ...
135
136
def execute(self) -> dict[str, Any]: ...
137
```
138
139
Builder pattern for constructing complex merge operations.
140
141
## Usage Examples
142
143
### Basic Writing Operations
144
145
```python
146
from deltalake import write_deltalake, DeltaTable
147
import pandas as pd
148
149
# Create sample data
150
data = pd.DataFrame({
151
'id': [1, 2, 3, 4, 5],
152
'name': ['Alice', 'Bob', 'Charlie', 'David', 'Eve'],
153
'age': [25, 30, 35, 28, 32],
154
'department': ['Engineering', 'Sales', 'Engineering', 'Sales', 'Marketing']
155
})
156
157
# Write to new table
158
write_deltalake("path/to/new-table", data, mode="error")
159
160
# Append more data
161
new_data = pd.DataFrame({
162
'id': [6, 7],
163
'name': ['Frank', 'Grace'],
164
'age': [29, 27],
165
'department': ['Engineering', 'Marketing']
166
})
167
168
write_deltalake("path/to/new-table", new_data, mode="append")
169
170
# Overwrite entire table
171
write_deltalake("path/to/new-table", data, mode="overwrite")
172
```
173
174
### Partitioned Writing
175
176
```python
177
# Write with partitioning
178
write_deltalake(
179
"path/to/partitioned-table",
180
data,
181
partition_by=["department"],
182
mode="error"
183
)
184
185
# Write with multiple partition columns
186
sales_data = pd.DataFrame({
187
'id': range(100),
188
'sale_date': pd.date_range('2023-01-01', periods=100),
189
'amount': np.random.randint(100, 1000, 100),
190
'region': np.random.choice(['US', 'EU', 'ASIA'], 100)
191
})
192
193
write_deltalake(
194
"path/to/sales-table",
195
sales_data,
196
partition_by=["region", "sale_date"],
197
mode="error"
198
)
199
```
200
201
### Update Operations
202
203
```python
204
dt = DeltaTable("path/to/table")
205
206
# Update with new values
207
result = dt.update(
208
predicate="age < 30",
209
new_values={"department": "Junior Engineering"}
210
)
211
print(f"Updated {result['num_updated_rows']} rows")
212
213
# Update with SQL expressions
214
dt.update(
215
predicate="department = 'Sales'",
216
updates={"age": "age + 1"} # Increment age by 1
217
)
218
219
# Conditional updates
220
dt.update(
221
predicate="name = 'Alice'",
222
new_values={
223
"age": 26,
224
"department": "Senior Engineering"
225
}
226
)
227
```
228
229
### Delete Operations
230
231
```python
232
# Delete specific records
233
result = dt.delete(predicate="age > 35")
234
print(f"Deleted {result['num_deleted_rows']} rows")
235
236
# Delete all records from a department
237
dt.delete(predicate="department = 'Marketing'")
238
239
# Delete without predicate (truncate)
240
dt.delete() # Deletes all rows
241
```
242
243
### Merge Operations (UPSERT)
244
245
```python
246
import pyarrow as pa
247
248
# Source data for merge
249
source_data = pa.table({
250
'id': [2, 3, 6],
251
'name': ['Bob Updated', 'Charlie Updated', 'New Person'],
252
'age': [31, 36, 40],
253
'department': ['Sales', 'Engineering', 'HR']
254
})
255
256
# Perform merge operation
257
merge_result = (
258
dt.merge(
259
source=source_data,
260
predicate="target.id = source.id",
261
source_alias="source",
262
target_alias="target"
263
)
264
.when_matched_update_all() # Update all columns when matched
265
.when_not_matched_insert_all() # Insert when not matched
266
.execute()
267
)
268
269
print(f"Merge completed:")
270
print(f"- Rows updated: {merge_result['num_source_rows']}")
271
print(f"- Rows inserted: {merge_result['num_target_rows_inserted']}")
272
```
273
274
### Advanced Merge with Conditions
275
276
```python
277
# Complex merge with conditions
278
merge_result = (
279
dt.merge(
280
source=source_data,
281
predicate="target.id = source.id"
282
)
283
.when_matched_update(
284
updates={
285
"name": "source.name",
286
"age": "source.age"
287
},
288
predicate="target.age < source.age" # Only update if source age is higher
289
)
290
.when_matched_delete(
291
predicate="source.department = 'DELETED'" # Delete if marked for deletion
292
)
293
.when_not_matched_insert(
294
updates={
295
"id": "source.id",
296
"name": "source.name",
297
"age": "source.age",
298
"department": "source.department"
299
},
300
predicate="source.age >= 18" # Only insert adults
301
)
302
.execute()
303
)
304
```
305
306
### Converting Existing Tables
307
308
```python
309
# Convert Parquet table to Delta
310
convert_to_deltalake(
311
"path/to/parquet-table",
312
partition_by=["year", "month"],
313
name="My Converted Table",
314
description="Converted from Parquet format"
315
)
316
317
# Convert with configuration
318
convert_to_deltalake(
319
"s3://bucket/parquet-data",
320
storage_options={
321
"AWS_REGION": "us-west-2",
322
"AWS_ACCESS_KEY_ID": "key",
323
"AWS_SECRET_ACCESS_KEY": "secret"
324
},
325
target_file_size=128 * 1024 * 1024 # 128MB target file size
326
)
327
```
328
329
### Write Configuration
330
331
```python
332
from deltalake.writer import WriterProperties, Compression
333
334
# Configure writer properties
335
writer_props = WriterProperties(
336
compression=Compression.SNAPPY,
337
max_row_group_size=10000,
338
write_batch_size=1000
339
)
340
341
# Write with custom properties
342
write_deltalake(
343
"path/to/table",
344
data,
345
writer_properties=writer_props,
346
target_file_size=64 * 1024 * 1024, # 64MB files
347
configuration={
348
"delta.autoOptimize.optimizeWrite": "true",
349
"delta.autoOptimize.autoCompact": "true"
350
}
351
)
352
```
353
354
## Writer Configuration Classes
355
356
### BloomFilterProperties
357
358
```python { .api }
359
@dataclass
360
class BloomFilterProperties:
361
def __init__(
362
self,
363
set_bloom_filter_enabled: bool | None,
364
fpp: float | None = None,
365
ndv: int | None = None,
366
) -> None: ...
367
368
set_bloom_filter_enabled: bool | None
369
fpp: float | None
370
ndv: int | None
371
```
372
373
Configure bloom filter properties for parquet writer optimization. Bloom filters help skip data files during reads by providing probabilistic membership testing.
374
375
**Parameters:**
376
- `set_bloom_filter_enabled`: Enable bloom filter with default values if no fpp/ndv provided
377
- `fpp`: False positive probability (between 0 and 1 exclusive)
378
- `ndv`: Number of distinct values for the bloom filter
379
380
### ColumnProperties
381
382
```python { .api }
383
@dataclass
384
class ColumnProperties:
385
def __init__(
386
self,
387
dictionary_enabled: bool | None = None,
388
statistics_enabled: Literal["NONE", "CHUNK", "PAGE"] | None = None,
389
bloom_filter_properties: BloomFilterProperties | None = None,
390
) -> None: ...
391
392
dictionary_enabled: bool | None
393
statistics_enabled: Literal["NONE", "CHUNK", "PAGE"] | None
394
bloom_filter_properties: BloomFilterProperties | None
395
```
396
397
Configure per-column properties for parquet writing, including dictionary encoding, statistics collection levels, and bloom filter settings.
398
399
**Parameters:**
400
- `dictionary_enabled`: Enable dictionary encoding for the column
401
- `statistics_enabled`: Statistics level for the column
402
- `bloom_filter_properties`: Bloom filter configuration for the column