0
# Table Maintenance
1
2
Operations for table optimization, vacuum cleanup, and checkpoint management to maintain table performance and storage efficiency over time.
3
4
## Imports
5
6
```python
7
from deltalake import DeltaTable, WriterProperties, PostCommitHookProperties, CommitProperties
8
from datetime import timedelta
9
from typing import Iterable
10
```
11
12
## Capabilities
13
14
### Vacuum Operations
15
16
```python { .api }
17
def vacuum(
18
self,
19
retention_hours: int | None = None,
20
dry_run: bool = True,
21
enforce_retention_duration: bool = True,
22
post_commithook_properties: PostCommitHookProperties | None = None,
23
commit_properties: CommitProperties | None = None,
24
full: bool = False,
25
keep_versions: list[int] | None = None,
26
) -> list[str]: ...
27
```
28
29
Clean up files no longer referenced by the Delta table and older than the retention threshold.
30
31
**Parameters:**
32
- `retention_hours`: Retention threshold (uses table config if None)
33
- `dry_run`: List files without deleting when True
34
- `enforce_retention_duration`: Enforce minimum retention when True
35
- `full`: Remove all unreferenced files when True
36
- `keep_versions`: Specific versions to preserve
37
38
### Checkpoint Management
39
40
```python { .api }
41
def create_checkpoint(self) -> None: ...
42
43
def cleanup_metadata(self) -> None: ...
44
```
45
46
Create checkpoints and clean up transaction log metadata.
47
48
### Table Optimization
49
50
```python { .api }
51
def optimize(self) -> TableOptimizer: ...
52
53
class TableOptimizer:
54
def compact(
55
self,
56
partition_filters: list[tuple[str, str, Any]] | None = None,
57
target_size: int | None = None,
58
max_concurrent_tasks: int | None = None,
59
min_commit_interval: int | timedelta | None = None,
60
writer_properties: WriterProperties | None = None,
61
post_commithook_properties: PostCommitHookProperties | None = None,
62
commit_properties: CommitProperties | None = None,
63
) -> dict[str, Any]: ...
64
65
def z_order(
66
self,
67
columns: Iterable[str],
68
partition_filters: list[tuple[str, str, Any]] | None = None,
69
target_size: int | None = None,
70
max_concurrent_tasks: int | None = None,
71
max_spill_size: int = 20 * 1024 * 1024 * 1024,
72
min_commit_interval: int | timedelta | None = None,
73
writer_properties: WriterProperties | None = None,
74
post_commithook_properties: PostCommitHookProperties | None = None,
75
commit_properties: CommitProperties | None = None,
76
) -> dict[str, Any]: ...
77
```
78
79
File compaction and Z-ordering for query performance optimization.
80
81
### Repair Operations
82
83
```python { .api }
84
def repair_table(
85
self,
86
dry_run: bool = True
87
) -> dict[str, Any]: ...
88
```
89
90
Repair table by checking for inconsistencies and fixing them.
91
92
## Usage Examples
93
94
### Basic Vacuum Operations
95
96
```python
97
from deltalake import DeltaTable
98
99
dt = DeltaTable("path/to/table")
100
101
# Dry run to see what would be deleted
102
files_to_delete = dt.vacuum(dry_run=True)
103
print(f"Would delete {len(files_to_delete)} files:")
104
for file in files_to_delete[:5]: # Show first 5
105
print(f" {file}")
106
107
# Actually delete old files (7 days retention)
108
deleted_files = dt.vacuum(
109
retention_hours=7 * 24, # 7 days
110
dry_run=False
111
)
112
print(f"Deleted {len(deleted_files)} files")
113
114
# Vacuum with custom retention (1 day, requires disabling enforcement)
115
deleted_files = dt.vacuum(
116
retention_hours=24, # 1 day
117
dry_run=False,
118
enforce_retention_duration=False # Allow shorter retention
119
)
120
```
121
122
### Advanced Vacuum Configuration
123
124
```python
125
from deltalake.transaction import CommitProperties, PostCommitHookProperties
126
127
# Vacuum with transaction properties
128
commit_props = CommitProperties(
129
app_metadata={
130
"operation": "scheduled_vacuum",
131
"retention_policy": "30_days"
132
}
133
)
134
135
post_commit_props = PostCommitHookProperties(
136
create_checkpoint=True,
137
cleanup_expired_logs=True
138
)
139
140
deleted_files = dt.vacuum(
141
retention_hours=30 * 24, # 30 days
142
dry_run=False,
143
commit_properties=commit_props,
144
post_commithook_properties=post_commit_props
145
)
146
147
print(f"Vacuum completed, deleted {len(deleted_files)} files")
148
```
149
150
### Full Vacuum and Version Preservation
151
152
```python
153
# Full vacuum removes ALL unreferenced files
154
deleted_files = dt.vacuum(
155
dry_run=False,
156
full=True # Remove all unreferenced files regardless of time
157
)
158
159
# Vacuum while preserving specific versions
160
important_versions = [10, 15, 20] # Versions to keep
161
deleted_files = dt.vacuum(
162
retention_hours=7 * 24,
163
dry_run=False,
164
keep_versions=important_versions
165
)
166
167
print(f"Deleted {len(deleted_files)} files while preserving versions {important_versions}")
168
```
169
170
### Checkpoint Management
171
172
```python
173
# Create checkpoint manually
174
dt.create_checkpoint()
175
print("Checkpoint created")
176
177
# Clean up old metadata files
178
dt.cleanup_metadata()
179
print("Metadata cleaned up")
180
181
# Check current version and checkpoint status
182
current_version = dt.version()
183
print(f"Current version: {current_version}")
184
185
# Get history to see checkpoint information
186
history = dt.history(limit=5)
187
for commit in history:
188
version = commit.get("version")
189
operation = commit.get("operation")
190
print(f"Version {version}: {operation}")
191
```
192
193
### Table Optimization
194
195
```python
196
# Basic file compaction
197
optimizer = dt.optimize()
198
compact_result = optimizer.compact()
199
200
print("Compaction results:")
201
print(f" Files added: {compact_result.get('num_files_added', 0)}")
202
print(f" Files removed: {compact_result.get('num_files_removed', 0)}")
203
print(f" Partitions optimized: {compact_result.get('partitions_optimized', 0)}")
204
205
# Compact specific partitions
206
compact_result = optimizer.compact(
207
partition_filters=[("year", "=", "2023"), ("month", "=", "01")],
208
target_size=128 * 1024 * 1024 # 128MB target file size
209
)
210
211
# Z-order optimization for better query performance
212
zorder_result = optimizer.z_order(
213
columns=["customer_id", "order_date"], # Columns to optimize for
214
target_size=256 * 1024 * 1024 # 256MB target files
215
)
216
217
print("\nZ-order optimization results:")
218
print(f" Files added: {zorder_result.get('num_files_added', 0)}")
219
print(f" Files removed: {zorder_result.get('num_files_removed', 0)}")
220
```
221
222
### Partition-Specific Optimization
223
224
```python
225
# Optimize only recent partitions
226
recent_partitions = [
227
("year", "=", "2023"),
228
("month", ">=", "10")
229
]
230
231
compact_result = optimizer.compact(
232
partition_filters=recent_partitions,
233
target_size=64 * 1024 * 1024, # 64MB files
234
max_concurrent_tasks=4 # Parallel optimization
235
)
236
237
# Z-order specific partitions with different columns
238
high_traffic_partitions = [("region", "=", "US")]
239
240
zorder_result = optimizer.z_order(
241
columns=["user_id", "timestamp", "event_type"],
242
partition_filters=high_traffic_partitions,
243
max_concurrent_tasks=8
244
)
245
```
246
247
### Comprehensive Maintenance Routine
248
249
```python
250
def comprehensive_maintenance(table_path: str, retention_days: int = 7):
251
"""Perform comprehensive table maintenance"""
252
dt = DeltaTable(table_path)
253
254
print(f"Starting maintenance for {table_path}")
255
print(f"Current version: {dt.version()}")
256
257
# 1. Create checkpoint
258
print("\n1. Creating checkpoint...")
259
dt.create_checkpoint()
260
261
# 2. Optimize table
262
print("\n2. Optimizing table...")
263
optimizer = dt.optimize()
264
265
# Compact small files
266
compact_result = optimizer.compact(target_size=128 * 1024 * 1024)
267
print(f" Compaction: {compact_result.get('num_files_removed', 0)} files removed, "
268
f"{compact_result.get('num_files_added', 0)} files added")
269
270
# Z-order on common query columns (example)
271
try:
272
zorder_result = optimizer.z_order(columns=["id", "created_date"])
273
print(f" Z-order: {zorder_result.get('num_files_removed', 0)} files removed, "
274
f"{zorder_result.get('num_files_added', 0)} files added")
275
except Exception as e:
276
print(f" Z-order failed (columns may not exist): {e}")
277
278
# 3. Vacuum old files
279
print(f"\n3. Vacuuming files older than {retention_days} days...")
280
retention_hours = retention_days * 24
281
282
# Dry run first
283
files_to_delete = dt.vacuum(retention_hours=retention_hours, dry_run=True)
284
print(f" Would delete {len(files_to_delete)} files")
285
286
# Actually delete
287
if files_to_delete:
288
deleted_files = dt.vacuum(retention_hours=retention_hours, dry_run=False)
289
print(f" Deleted {len(deleted_files)} files")
290
291
# 4. Clean up metadata
292
print("\n4. Cleaning up metadata...")
293
dt.cleanup_metadata()
294
295
# 5. Final status
296
print(f"\nMaintenance completed. Final version: {dt.version()}")
297
298
return {
299
"compaction": compact_result,
300
"vacuum_deleted": len(deleted_files) if files_to_delete else 0
301
}
302
303
# Run comprehensive maintenance
304
maintenance_result = comprehensive_maintenance("path/to/table", retention_days=14)
305
```
306
307
### Scheduled Maintenance
308
309
```python
310
import schedule
311
import time
312
from datetime import datetime
313
314
def scheduled_maintenance():
315
"""Maintenance job for scheduled execution"""
316
tables_to_maintain = [
317
"path/to/table1",
318
"path/to/table2",
319
"path/to/table3"
320
]
321
322
for table_path in tables_to_maintain:
323
try:
324
print(f"\n{datetime.now()}: Maintaining {table_path}")
325
result = comprehensive_maintenance(table_path, retention_days=30)
326
print(f"Maintenance completed for {table_path}")
327
328
except Exception as e:
329
print(f"Maintenance failed for {table_path}: {e}")
330
331
# Schedule maintenance jobs
332
schedule.every().sunday.at("02:00").do(scheduled_maintenance) # Weekly
333
schedule.every().day.at("01:00").do(lambda: vacuum_only_maintenance()) # Daily vacuum
334
335
def vacuum_only_maintenance():
336
"""Daily vacuum-only maintenance"""
337
tables = ["path/to/high_traffic_table1", "path/to/high_traffic_table2"]
338
339
for table_path in tables:
340
try:
341
dt = DeltaTable(table_path)
342
deleted = dt.vacuum(retention_hours=7*24, dry_run=False)
343
print(f"Daily vacuum for {table_path}: deleted {len(deleted)} files")
344
except Exception as e:
345
print(f"Daily vacuum failed for {table_path}: {e}")
346
347
# Run scheduler (in production, this would be in a separate service)
348
# while True:
349
# schedule.run_pending()
350
# time.sleep(3600) # Check every hour
351
```
352
353
### Table Repair and Health Checks
354
355
```python
356
def table_health_check(table_path: str):
357
"""Comprehensive table health check"""
358
dt = DeltaTable(table_path)
359
360
print(f"Health check for {table_path}")
361
print(f"Current version: {dt.version()}")
362
363
# Check basic table properties
364
try:
365
schema = dt.schema()
366
metadata = dt.metadata()
367
files = dt.files()
368
369
print(f"Schema fields: {len(schema.fields)}")
370
print(f"Table files: {len(files)}")
371
print(f"Partition columns: {metadata.partition_columns}")
372
373
except Exception as e:
374
print(f"Error reading table properties: {e}")
375
return False
376
377
# Check for repair needs
378
try:
379
repair_result = dt.repair_table(dry_run=True)
380
issues_found = repair_result.get("issues_found", 0)
381
382
if issues_found > 0:
383
print(f"Found {issues_found} issues that need repair")
384
385
# Actually repair if needed
386
repair_result = dt.repair_table(dry_run=False)
387
print(f"Repair completed: {repair_result}")
388
else:
389
print("No repair issues found")
390
391
except Exception as e:
392
print(f"Repair check failed: {e}")
393
394
# Check file size distribution
395
try:
396
add_actions = dt.get_add_actions()
397
sizes_df = add_actions.to_pandas()
398
399
if not sizes_df.empty:
400
avg_size = sizes_df['size'].mean()
401
min_size = sizes_df['size'].min()
402
max_size = sizes_df['size'].max()
403
404
print(f"File sizes - Avg: {avg_size/1024/1024:.1f}MB, "
405
f"Min: {min_size/1024/1024:.1f}MB, Max: {max_size/1024/1024:.1f}MB")
406
407
# Recommend optimization if needed
408
if max_size / min_size > 100 or avg_size < 10*1024*1024: # 10MB
409
print("Recommendation: Consider running optimize.compact()")
410
411
except Exception as e:
412
print(f"File analysis failed: {e}")
413
414
return True
415
416
# Run health checks
417
tables_to_check = ["path/to/table1", "path/to/table2"]
418
for table_path in tables_to_check:
419
table_health_check(table_path)
420
print("-" * 50)
421
```