or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

data-reading.mdindex.mdquery-operations.mdschema-management.mdtable-maintenance.mdtable-operations.mdtransaction-management.mdwriting-modification.md

table-maintenance.mddocs/

0

# Table Maintenance

1

2

Operations for table optimization, vacuum cleanup, and checkpoint management to maintain table performance and storage efficiency over time.

3

4

## Imports

5

6

```python

7

from deltalake import DeltaTable, WriterProperties, PostCommitHookProperties, CommitProperties

8

from datetime import timedelta

9

from typing import Iterable

10

```

11

12

## Capabilities

13

14

### Vacuum Operations

15

16

```python { .api }

17

def vacuum(

18

self,

19

retention_hours: int | None = None,

20

dry_run: bool = True,

21

enforce_retention_duration: bool = True,

22

post_commithook_properties: PostCommitHookProperties | None = None,

23

commit_properties: CommitProperties | None = None,

24

full: bool = False,

25

keep_versions: list[int] | None = None,

26

) -> list[str]: ...

27

```

28

29

Clean up files no longer referenced by the Delta table and older than the retention threshold.

30

31

**Parameters:**

32

- `retention_hours`: Retention threshold (uses table config if None)

33

- `dry_run`: List files without deleting when True

34

- `enforce_retention_duration`: Enforce minimum retention when True

35

- `full`: Remove all unreferenced files when True

36

- `keep_versions`: Specific versions to preserve

37

38

### Checkpoint Management

39

40

```python { .api }

41

def create_checkpoint(self) -> None: ...

42

43

def cleanup_metadata(self) -> None: ...

44

```

45

46

Create checkpoints and clean up transaction log metadata.

47

48

### Table Optimization

49

50

```python { .api }

51

def optimize(self) -> TableOptimizer: ...

52

53

class TableOptimizer:

54

def compact(

55

self,

56

partition_filters: list[tuple[str, str, Any]] | None = None,

57

target_size: int | None = None,

58

max_concurrent_tasks: int | None = None,

59

min_commit_interval: int | timedelta | None = None,

60

writer_properties: WriterProperties | None = None,

61

post_commithook_properties: PostCommitHookProperties | None = None,

62

commit_properties: CommitProperties | None = None,

63

) -> dict[str, Any]: ...

64

65

def z_order(

66

self,

67

columns: Iterable[str],

68

partition_filters: list[tuple[str, str, Any]] | None = None,

69

target_size: int | None = None,

70

max_concurrent_tasks: int | None = None,

71

max_spill_size: int = 20 * 1024 * 1024 * 1024,

72

min_commit_interval: int | timedelta | None = None,

73

writer_properties: WriterProperties | None = None,

74

post_commithook_properties: PostCommitHookProperties | None = None,

75

commit_properties: CommitProperties | None = None,

76

) -> dict[str, Any]: ...

77

```

78

79

File compaction and Z-ordering for query performance optimization.

80

81

### Repair Operations

82

83

```python { .api }

84

def repair_table(

85

self,

86

dry_run: bool = True

87

) -> dict[str, Any]: ...

88

```

89

90

Repair table by checking for inconsistencies and fixing them.

91

92

## Usage Examples

93

94

### Basic Vacuum Operations

95

96

```python

97

from deltalake import DeltaTable

98

99

dt = DeltaTable("path/to/table")

100

101

# Dry run to see what would be deleted

102

files_to_delete = dt.vacuum(dry_run=True)

103

print(f"Would delete {len(files_to_delete)} files:")

104

for file in files_to_delete[:5]: # Show first 5

105

print(f" {file}")

106

107

# Actually delete old files (7 days retention)

108

deleted_files = dt.vacuum(

109

retention_hours=7 * 24, # 7 days

110

dry_run=False

111

)

112

print(f"Deleted {len(deleted_files)} files")

113

114

# Vacuum with custom retention (1 day, requires disabling enforcement)

115

deleted_files = dt.vacuum(

116

retention_hours=24, # 1 day

117

dry_run=False,

118

enforce_retention_duration=False # Allow shorter retention

119

)

120

```

121

122

### Advanced Vacuum Configuration

123

124

```python

125

from deltalake.transaction import CommitProperties, PostCommitHookProperties

126

127

# Vacuum with transaction properties

128

commit_props = CommitProperties(

129

app_metadata={

130

"operation": "scheduled_vacuum",

131

"retention_policy": "30_days"

132

}

133

)

134

135

post_commit_props = PostCommitHookProperties(

136

create_checkpoint=True,

137

cleanup_expired_logs=True

138

)

139

140

deleted_files = dt.vacuum(

141

retention_hours=30 * 24, # 30 days

142

dry_run=False,

143

commit_properties=commit_props,

144

post_commithook_properties=post_commit_props

145

)

146

147

print(f"Vacuum completed, deleted {len(deleted_files)} files")

148

```

149

150

### Full Vacuum and Version Preservation

151

152

```python

153

# Full vacuum removes ALL unreferenced files

154

deleted_files = dt.vacuum(

155

dry_run=False,

156

full=True # Remove all unreferenced files regardless of time

157

)

158

159

# Vacuum while preserving specific versions

160

important_versions = [10, 15, 20] # Versions to keep

161

deleted_files = dt.vacuum(

162

retention_hours=7 * 24,

163

dry_run=False,

164

keep_versions=important_versions

165

)

166

167

print(f"Deleted {len(deleted_files)} files while preserving versions {important_versions}")

168

```

169

170

### Checkpoint Management

171

172

```python

173

# Create checkpoint manually

174

dt.create_checkpoint()

175

print("Checkpoint created")

176

177

# Clean up old metadata files

178

dt.cleanup_metadata()

179

print("Metadata cleaned up")

180

181

# Check current version and checkpoint status

182

current_version = dt.version()

183

print(f"Current version: {current_version}")

184

185

# Get history to see checkpoint information

186

history = dt.history(limit=5)

187

for commit in history:

188

version = commit.get("version")

189

operation = commit.get("operation")

190

print(f"Version {version}: {operation}")

191

```

192

193

### Table Optimization

194

195

```python

196

# Basic file compaction

197

optimizer = dt.optimize()

198

compact_result = optimizer.compact()

199

200

print("Compaction results:")

201

print(f" Files added: {compact_result.get('num_files_added', 0)}")

202

print(f" Files removed: {compact_result.get('num_files_removed', 0)}")

203

print(f" Partitions optimized: {compact_result.get('partitions_optimized', 0)}")

204

205

# Compact specific partitions

206

compact_result = optimizer.compact(

207

partition_filters=[("year", "=", "2023"), ("month", "=", "01")],

208

target_size=128 * 1024 * 1024 # 128MB target file size

209

)

210

211

# Z-order optimization for better query performance

212

zorder_result = optimizer.z_order(

213

columns=["customer_id", "order_date"], # Columns to optimize for

214

target_size=256 * 1024 * 1024 # 256MB target files

215

)

216

217

print("\nZ-order optimization results:")

218

print(f" Files added: {zorder_result.get('num_files_added', 0)}")

219

print(f" Files removed: {zorder_result.get('num_files_removed', 0)}")

220

```

221

222

### Partition-Specific Optimization

223

224

```python

225

# Optimize only recent partitions

226

recent_partitions = [

227

("year", "=", "2023"),

228

("month", ">=", "10")

229

]

230

231

compact_result = optimizer.compact(

232

partition_filters=recent_partitions,

233

target_size=64 * 1024 * 1024, # 64MB files

234

max_concurrent_tasks=4 # Parallel optimization

235

)

236

237

# Z-order specific partitions with different columns

238

high_traffic_partitions = [("region", "=", "US")]

239

240

zorder_result = optimizer.z_order(

241

columns=["user_id", "timestamp", "event_type"],

242

partition_filters=high_traffic_partitions,

243

max_concurrent_tasks=8

244

)

245

```

246

247

### Comprehensive Maintenance Routine

248

249

```python

250

def comprehensive_maintenance(table_path: str, retention_days: int = 7):

251

"""Perform comprehensive table maintenance"""

252

dt = DeltaTable(table_path)

253

254

print(f"Starting maintenance for {table_path}")

255

print(f"Current version: {dt.version()}")

256

257

# 1. Create checkpoint

258

print("\n1. Creating checkpoint...")

259

dt.create_checkpoint()

260

261

# 2. Optimize table

262

print("\n2. Optimizing table...")

263

optimizer = dt.optimize()

264

265

# Compact small files

266

compact_result = optimizer.compact(target_size=128 * 1024 * 1024)

267

print(f" Compaction: {compact_result.get('num_files_removed', 0)} files removed, "

268

f"{compact_result.get('num_files_added', 0)} files added")

269

270

# Z-order on common query columns (example)

271

try:

272

zorder_result = optimizer.z_order(columns=["id", "created_date"])

273

print(f" Z-order: {zorder_result.get('num_files_removed', 0)} files removed, "

274

f"{zorder_result.get('num_files_added', 0)} files added")

275

except Exception as e:

276

print(f" Z-order failed (columns may not exist): {e}")

277

278

# 3. Vacuum old files

279

print(f"\n3. Vacuuming files older than {retention_days} days...")

280

retention_hours = retention_days * 24

281

282

# Dry run first

283

files_to_delete = dt.vacuum(retention_hours=retention_hours, dry_run=True)

284

print(f" Would delete {len(files_to_delete)} files")

285

286

# Actually delete

287

if files_to_delete:

288

deleted_files = dt.vacuum(retention_hours=retention_hours, dry_run=False)

289

print(f" Deleted {len(deleted_files)} files")

290

291

# 4. Clean up metadata

292

print("\n4. Cleaning up metadata...")

293

dt.cleanup_metadata()

294

295

# 5. Final status

296

print(f"\nMaintenance completed. Final version: {dt.version()}")

297

298

return {

299

"compaction": compact_result,

300

"vacuum_deleted": len(deleted_files) if files_to_delete else 0

301

}

302

303

# Run comprehensive maintenance

304

maintenance_result = comprehensive_maintenance("path/to/table", retention_days=14)

305

```

306

307

### Scheduled Maintenance

308

309

```python

310

import schedule

311

import time

312

from datetime import datetime

313

314

def scheduled_maintenance():

315

"""Maintenance job for scheduled execution"""

316

tables_to_maintain = [

317

"path/to/table1",

318

"path/to/table2",

319

"path/to/table3"

320

]

321

322

for table_path in tables_to_maintain:

323

try:

324

print(f"\n{datetime.now()}: Maintaining {table_path}")

325

result = comprehensive_maintenance(table_path, retention_days=30)

326

print(f"Maintenance completed for {table_path}")

327

328

except Exception as e:

329

print(f"Maintenance failed for {table_path}: {e}")

330

331

# Schedule maintenance jobs

332

schedule.every().sunday.at("02:00").do(scheduled_maintenance) # Weekly

333

schedule.every().day.at("01:00").do(lambda: vacuum_only_maintenance()) # Daily vacuum

334

335

def vacuum_only_maintenance():

336

"""Daily vacuum-only maintenance"""

337

tables = ["path/to/high_traffic_table1", "path/to/high_traffic_table2"]

338

339

for table_path in tables:

340

try:

341

dt = DeltaTable(table_path)

342

deleted = dt.vacuum(retention_hours=7*24, dry_run=False)

343

print(f"Daily vacuum for {table_path}: deleted {len(deleted)} files")

344

except Exception as e:

345

print(f"Daily vacuum failed for {table_path}: {e}")

346

347

# Run scheduler (in production, this would be in a separate service)

348

# while True:

349

# schedule.run_pending()

350

# time.sleep(3600) # Check every hour

351

```

352

353

### Table Repair and Health Checks

354

355

```python

356

def table_health_check(table_path: str):

357

"""Comprehensive table health check"""

358

dt = DeltaTable(table_path)

359

360

print(f"Health check for {table_path}")

361

print(f"Current version: {dt.version()}")

362

363

# Check basic table properties

364

try:

365

schema = dt.schema()

366

metadata = dt.metadata()

367

files = dt.files()

368

369

print(f"Schema fields: {len(schema.fields)}")

370

print(f"Table files: {len(files)}")

371

print(f"Partition columns: {metadata.partition_columns}")

372

373

except Exception as e:

374

print(f"Error reading table properties: {e}")

375

return False

376

377

# Check for repair needs

378

try:

379

repair_result = dt.repair_table(dry_run=True)

380

issues_found = repair_result.get("issues_found", 0)

381

382

if issues_found > 0:

383

print(f"Found {issues_found} issues that need repair")

384

385

# Actually repair if needed

386

repair_result = dt.repair_table(dry_run=False)

387

print(f"Repair completed: {repair_result}")

388

else:

389

print("No repair issues found")

390

391

except Exception as e:

392

print(f"Repair check failed: {e}")

393

394

# Check file size distribution

395

try:

396

add_actions = dt.get_add_actions()

397

sizes_df = add_actions.to_pandas()

398

399

if not sizes_df.empty:

400

avg_size = sizes_df['size'].mean()

401

min_size = sizes_df['size'].min()

402

max_size = sizes_df['size'].max()

403

404

print(f"File sizes - Avg: {avg_size/1024/1024:.1f}MB, "

405

f"Min: {min_size/1024/1024:.1f}MB, Max: {max_size/1024/1024:.1f}MB")

406

407

# Recommend optimization if needed

408

if max_size / min_size > 100 or avg_size < 10*1024*1024: # 10MB

409

print("Recommendation: Consider running optimize.compact()")

410

411

except Exception as e:

412

print(f"File analysis failed: {e}")

413

414

return True

415

416

# Run health checks

417

tables_to_check = ["path/to/table1", "path/to/table2"]

418

for table_path in tables_to_check:

419

table_health_check(table_path)

420

print("-" * 50)

421

```