0
# Transaction Management
1
2
Transaction properties, commit configurations, and ACID transaction control for ensuring data consistency and managing concurrent access to Delta Lake tables.
3
4
## Capabilities
5
6
### Commit Properties
7
8
```python { .api }
9
@dataclass
10
class CommitProperties:
11
def __init__(
12
self,
13
max_retry_commit_attempts: int | None = None,
14
app_metadata: dict[str, Any] | None = None,
15
app_id: str | None = None
16
) -> None: ...
17
18
@property
19
def max_retry_commit_attempts(self) -> int | None: ...
20
21
@property
22
def app_metadata(self) -> dict[str, Any] | None: ...
23
24
@property
25
def app_id(self) -> str | None: ...
26
```
27
28
Configuration for commit behavior and retry logic.
29
30
### Post-Commit Hook Properties
31
32
```python { .api }
33
@dataclass
34
class PostCommitHookProperties:
35
def __init__(
36
self,
37
create_checkpoint: bool = True,
38
cleanup_expired_logs: bool | None = None,
39
) -> None: ...
40
41
@property
42
def create_checkpoint(self) -> bool: ...
43
44
@property
45
def cleanup_expired_logs(self) -> bool | None: ...
46
```
47
48
Configuration for operations that run after successful commits.
49
50
### Transaction Class
51
52
```python { .api }
53
class Transaction:
54
def commit(
55
self,
56
actions: list[AddAction],
57
commit_properties: CommitProperties | None = None,
58
post_commit_hook_properties: PostCommitHookProperties | None = None
59
) -> int: ...
60
```
61
62
Low-level transaction interface for advanced operations.
63
64
### Add Action
65
66
```python { .api }
67
@dataclass
68
class AddAction:
69
path: str
70
size: int
71
partition_values: Mapping[str, str | None]
72
modification_time: int
73
data_change: bool
74
stats: str
75
```
76
77
Represents a file addition in the transaction log.
78
79
### Transaction Helper Functions
80
81
```python { .api }
82
def create_table_with_add_actions(
83
table_uri: str,
84
schema: Schema,
85
add_actions: list[AddAction],
86
partition_by: list[str] | None = None,
87
name: str | None = None,
88
description: str | None = None,
89
configuration: dict[str, str | None] | None = None,
90
storage_options: dict[str, str] | None = None
91
) -> DeltaTable: ...
92
```
93
94
Create a table directly from add actions (advanced use case).
95
96
## Usage Examples
97
98
### Basic Transaction Configuration
99
100
```python
101
from deltalake import DeltaTable, write_deltalake
102
from deltalake.transaction import CommitProperties, PostCommitHookProperties
103
104
# Configure commit properties
105
commit_props = CommitProperties(
106
max_retry_commit_attempts=5,
107
app_metadata={"application": "data_pipeline", "version": "1.0.0"},
108
app_id="my_application"
109
)
110
111
# Configure post-commit hooks
112
post_commit_props = PostCommitHookProperties(
113
create_checkpoint=True, # Create checkpoints automatically
114
cleanup_expired_logs=True # Clean up old log files
115
)
116
117
# Use in write operation
118
write_deltalake(
119
"path/to/table",
120
data,
121
mode="append",
122
commit_properties=commit_props,
123
post_commithook_properties=post_commit_props
124
)
125
```
126
127
### Transaction Properties in Updates
128
129
```python
130
dt = DeltaTable("path/to/table")
131
132
# Update with transaction properties
133
result = dt.update(
134
predicate="status = 'pending'",
135
new_values={"status": "processed", "processed_at": "current_timestamp()"},
136
commit_properties=CommitProperties(
137
max_retry_commit_attempts=3,
138
app_metadata={"operation": "batch_update", "batch_id": "12345"}
139
),
140
post_commithook_properties=PostCommitHookProperties(
141
create_checkpoint=False, # Skip checkpoint for this update
142
cleanup_expired_logs=False
143
)
144
)
145
146
print(f"Updated {result['num_updated_rows']} rows")
147
```
148
149
### Merge with Transaction Control
150
151
```python
152
import pyarrow as pa
153
154
source_data = pa.table({
155
'id': [1, 2, 3],
156
'status': ['active', 'inactive', 'pending'],
157
'last_modified': ['2023-01-01', '2023-01-02', '2023-01-03']
158
})
159
160
# Merge with custom transaction properties
161
merge_result = (
162
dt.merge(
163
source=source_data,
164
predicate="target.id = source.id",
165
commit_properties=CommitProperties(
166
max_retry_commit_attempts=10, # High retry for critical operation
167
app_metadata={
168
"operation": "daily_sync",
169
"source": "external_system",
170
"sync_timestamp": "2023-01-15T10:00:00Z"
171
}
172
),
173
post_commithook_properties=PostCommitHookProperties(
174
create_checkpoint=True,
175
cleanup_expired_logs=True
176
)
177
)
178
.when_matched_update_all()
179
.when_not_matched_insert_all()
180
.execute()
181
)
182
```
183
184
### Delete with Transaction Properties
185
186
```python
187
# Delete with tracking metadata
188
delete_result = dt.delete(
189
predicate="created_at < '2022-01-01'",
190
commit_properties=CommitProperties(
191
app_metadata={
192
"operation": "data_retention_cleanup",
193
"retention_policy": "delete_older_than_1_year",
194
"executed_by": "automated_cleanup_job"
195
}
196
),
197
post_commithook_properties=PostCommitHookProperties(
198
create_checkpoint=True, # Create checkpoint after cleanup
199
cleanup_expired_logs=True # Clean logs after delete
200
)
201
)
202
203
print(f"Deleted {delete_result['num_deleted_rows']} old records")
204
```
205
206
### Advanced Transaction Handling
207
208
```python
209
from deltalake.transaction import Transaction, AddAction
210
211
# Low-level transaction for advanced scenarios
212
def custom_transaction_example():
213
dt = DeltaTable("path/to/table")
214
215
# Create add actions manually (advanced use case)
216
add_actions = [
217
AddAction(
218
path="data/part-00001.parquet",
219
size=1024000,
220
partition_values={"year": "2023", "month": "01"},
221
modification_time=1672531200000, # Unix timestamp in milliseconds
222
data_change=True,
223
stats='{"numRecords": 1000}'
224
)
225
]
226
227
# Get transaction and commit
228
transaction = dt._table.create_transaction() # Internal API
229
230
commit_props = CommitProperties(
231
max_retry_commit_attempts=3,
232
app_metadata={"custom_operation": "manual_file_addition"}
233
)
234
235
# Commit the transaction
236
version = transaction.commit(
237
add_actions,
238
commit_properties=commit_props
239
)
240
241
print(f"Committed transaction at version {version}")
242
```
243
244
### Retry and Conflict Handling
245
246
```python
247
import time
248
from deltalake.exceptions import CommitFailedError
249
250
def robust_update_with_retry():
251
dt = DeltaTable("path/to/table")
252
253
max_retries = 5
254
base_delay = 1.0
255
256
for attempt in range(max_retries):
257
try:
258
result = dt.update(
259
predicate="status = 'processing'",
260
new_values={"status": "completed"},
261
commit_properties=CommitProperties(
262
max_retry_commit_attempts=1, # Let us handle retries
263
app_metadata={
264
"attempt": attempt + 1,
265
"operation": "status_update"
266
}
267
)
268
)
269
270
print(f"Update succeeded on attempt {attempt + 1}")
271
return result
272
273
except CommitFailedError as e:
274
if attempt == max_retries - 1:
275
print(f"Update failed after {max_retries} attempts")
276
raise
277
278
# Exponential backoff
279
delay = base_delay * (2 ** attempt)
280
print(f"Attempt {attempt + 1} failed, retrying in {delay}s...")
281
time.sleep(delay)
282
283
# Reload table to get latest state
284
dt = DeltaTable(dt.table_uri)
285
286
# Use the robust update function
287
try:
288
result = robust_update_with_retry()
289
print(f"Successfully updated {result['num_updated_rows']} rows")
290
except Exception as e:
291
print(f"All retry attempts failed: {e}")
292
```
293
294
### Monitoring Transaction Metadata
295
296
```python
297
def analyze_transaction_history():
298
dt = DeltaTable("path/to/table")
299
300
# Get commit history
301
history = dt.history(limit=10)
302
303
print("Recent transaction history:")
304
for commit in history:
305
version = commit.get("version")
306
operation = commit.get("operation", "unknown")
307
timestamp = commit.get("timestamp")
308
309
# Extract app metadata if present
310
operation_parameters = commit.get("operationParameters", {})
311
app_metadata = operation_parameters.get("app_metadata")
312
313
print(f"Version {version}: {operation} at {timestamp}")
314
315
if app_metadata:
316
print(f" App metadata: {app_metadata}")
317
318
# Show operation metrics
319
if "operationMetrics" in commit:
320
metrics = commit["operationMetrics"]
321
for key, value in metrics.items():
322
print(f" {key}: {value}")
323
324
print()
325
326
# Run analysis
327
analyze_transaction_history()
328
```
329
330
### Checkpoint and Log Management
331
332
```python
333
# Manual checkpoint creation
334
dt.create_checkpoint()
335
print("Checkpoint created successfully")
336
337
# Cleanup metadata (removes old log files)
338
dt.cleanup_metadata()
339
print("Old metadata cleaned up")
340
341
# Configure automatic cleanup
342
post_commit_props = PostCommitHookProperties(
343
create_checkpoint=True,
344
cleanup_expired_logs=True
345
)
346
347
# All subsequent operations will use these settings
348
write_deltalake(
349
dt,
350
new_data,
351
mode="append",
352
post_commithook_properties=post_commit_props
353
)
354
```