or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

data-reading.mdindex.mdquery-operations.mdschema-management.mdtable-maintenance.mdtable-operations.mdtransaction-management.mdwriting-modification.md

transaction-management.mddocs/

0

# Transaction Management

1

2

Transaction properties, commit configurations, and ACID transaction control for ensuring data consistency and managing concurrent access to Delta Lake tables.

3

4

## Capabilities

5

6

### Commit Properties

7

8

```python { .api }

9

@dataclass

10

class CommitProperties:

11

def __init__(

12

self,

13

max_retry_commit_attempts: int | None = None,

14

app_metadata: dict[str, Any] | None = None,

15

app_id: str | None = None

16

) -> None: ...

17

18

@property

19

def max_retry_commit_attempts(self) -> int | None: ...

20

21

@property

22

def app_metadata(self) -> dict[str, Any] | None: ...

23

24

@property

25

def app_id(self) -> str | None: ...

26

```

27

28

Configuration for commit behavior and retry logic.

29

30

### Post-Commit Hook Properties

31

32

```python { .api }

33

@dataclass

34

class PostCommitHookProperties:

35

def __init__(

36

self,

37

create_checkpoint: bool = True,

38

cleanup_expired_logs: bool | None = None,

39

) -> None: ...

40

41

@property

42

def create_checkpoint(self) -> bool: ...

43

44

@property

45

def cleanup_expired_logs(self) -> bool | None: ...

46

```

47

48

Configuration for operations that run after successful commits.

49

50

### Transaction Class

51

52

```python { .api }

53

class Transaction:

54

def commit(

55

self,

56

actions: list[AddAction],

57

commit_properties: CommitProperties | None = None,

58

post_commit_hook_properties: PostCommitHookProperties | None = None

59

) -> int: ...

60

```

61

62

Low-level transaction interface for advanced operations.

63

64

### Add Action

65

66

```python { .api }

67

@dataclass

68

class AddAction:

69

path: str

70

size: int

71

partition_values: Mapping[str, str | None]

72

modification_time: int

73

data_change: bool

74

stats: str

75

```

76

77

Represents a file addition in the transaction log.

78

79

### Transaction Helper Functions

80

81

```python { .api }

82

def create_table_with_add_actions(

83

table_uri: str,

84

schema: Schema,

85

add_actions: list[AddAction],

86

partition_by: list[str] | None = None,

87

name: str | None = None,

88

description: str | None = None,

89

configuration: dict[str, str | None] | None = None,

90

storage_options: dict[str, str] | None = None

91

) -> DeltaTable: ...

92

```

93

94

Create a table directly from add actions (advanced use case).

95

96

## Usage Examples

97

98

### Basic Transaction Configuration

99

100

```python

101

from deltalake import DeltaTable, write_deltalake

102

from deltalake.transaction import CommitProperties, PostCommitHookProperties

103

104

# Configure commit properties

105

commit_props = CommitProperties(

106

max_retry_commit_attempts=5,

107

app_metadata={"application": "data_pipeline", "version": "1.0.0"},

108

app_id="my_application"

109

)

110

111

# Configure post-commit hooks

112

post_commit_props = PostCommitHookProperties(

113

create_checkpoint=True, # Create checkpoints automatically

114

cleanup_expired_logs=True # Clean up old log files

115

)

116

117

# Use in write operation

118

write_deltalake(

119

"path/to/table",

120

data,

121

mode="append",

122

commit_properties=commit_props,

123

post_commithook_properties=post_commit_props

124

)

125

```

126

127

### Transaction Properties in Updates

128

129

```python

130

dt = DeltaTable("path/to/table")

131

132

# Update with transaction properties

133

result = dt.update(

134

predicate="status = 'pending'",

135

new_values={"status": "processed", "processed_at": "current_timestamp()"},

136

commit_properties=CommitProperties(

137

max_retry_commit_attempts=3,

138

app_metadata={"operation": "batch_update", "batch_id": "12345"}

139

),

140

post_commithook_properties=PostCommitHookProperties(

141

create_checkpoint=False, # Skip checkpoint for this update

142

cleanup_expired_logs=False

143

)

144

)

145

146

print(f"Updated {result['num_updated_rows']} rows")

147

```

148

149

### Merge with Transaction Control

150

151

```python

152

import pyarrow as pa

153

154

source_data = pa.table({

155

'id': [1, 2, 3],

156

'status': ['active', 'inactive', 'pending'],

157

'last_modified': ['2023-01-01', '2023-01-02', '2023-01-03']

158

})

159

160

# Merge with custom transaction properties

161

merge_result = (

162

dt.merge(

163

source=source_data,

164

predicate="target.id = source.id",

165

commit_properties=CommitProperties(

166

max_retry_commit_attempts=10, # High retry for critical operation

167

app_metadata={

168

"operation": "daily_sync",

169

"source": "external_system",

170

"sync_timestamp": "2023-01-15T10:00:00Z"

171

}

172

),

173

post_commithook_properties=PostCommitHookProperties(

174

create_checkpoint=True,

175

cleanup_expired_logs=True

176

)

177

)

178

.when_matched_update_all()

179

.when_not_matched_insert_all()

180

.execute()

181

)

182

```

183

184

### Delete with Transaction Properties

185

186

```python

187

# Delete with tracking metadata

188

delete_result = dt.delete(

189

predicate="created_at < '2022-01-01'",

190

commit_properties=CommitProperties(

191

app_metadata={

192

"operation": "data_retention_cleanup",

193

"retention_policy": "delete_older_than_1_year",

194

"executed_by": "automated_cleanup_job"

195

}

196

),

197

post_commithook_properties=PostCommitHookProperties(

198

create_checkpoint=True, # Create checkpoint after cleanup

199

cleanup_expired_logs=True # Clean logs after delete

200

)

201

)

202

203

print(f"Deleted {delete_result['num_deleted_rows']} old records")

204

```

205

206

### Advanced Transaction Handling

207

208

```python

209

from deltalake.transaction import Transaction, AddAction

210

211

# Low-level transaction for advanced scenarios

212

def custom_transaction_example():

213

dt = DeltaTable("path/to/table")

214

215

# Create add actions manually (advanced use case)

216

add_actions = [

217

AddAction(

218

path="data/part-00001.parquet",

219

size=1024000,

220

partition_values={"year": "2023", "month": "01"},

221

modification_time=1672531200000, # Unix timestamp in milliseconds

222

data_change=True,

223

stats='{"numRecords": 1000}'

224

)

225

]

226

227

# Get transaction and commit

228

transaction = dt._table.create_transaction() # Internal API

229

230

commit_props = CommitProperties(

231

max_retry_commit_attempts=3,

232

app_metadata={"custom_operation": "manual_file_addition"}

233

)

234

235

# Commit the transaction

236

version = transaction.commit(

237

add_actions,

238

commit_properties=commit_props

239

)

240

241

print(f"Committed transaction at version {version}")

242

```

243

244

### Retry and Conflict Handling

245

246

```python

247

import time

248

from deltalake.exceptions import CommitFailedError

249

250

def robust_update_with_retry():

251

dt = DeltaTable("path/to/table")

252

253

max_retries = 5

254

base_delay = 1.0

255

256

for attempt in range(max_retries):

257

try:

258

result = dt.update(

259

predicate="status = 'processing'",

260

new_values={"status": "completed"},

261

commit_properties=CommitProperties(

262

max_retry_commit_attempts=1, # Let us handle retries

263

app_metadata={

264

"attempt": attempt + 1,

265

"operation": "status_update"

266

}

267

)

268

)

269

270

print(f"Update succeeded on attempt {attempt + 1}")

271

return result

272

273

except CommitFailedError as e:

274

if attempt == max_retries - 1:

275

print(f"Update failed after {max_retries} attempts")

276

raise

277

278

# Exponential backoff

279

delay = base_delay * (2 ** attempt)

280

print(f"Attempt {attempt + 1} failed, retrying in {delay}s...")

281

time.sleep(delay)

282

283

# Reload table to get latest state

284

dt = DeltaTable(dt.table_uri)

285

286

# Use the robust update function

287

try:

288

result = robust_update_with_retry()

289

print(f"Successfully updated {result['num_updated_rows']} rows")

290

except Exception as e:

291

print(f"All retry attempts failed: {e}")

292

```

293

294

### Monitoring Transaction Metadata

295

296

```python

297

def analyze_transaction_history():

298

dt = DeltaTable("path/to/table")

299

300

# Get commit history

301

history = dt.history(limit=10)

302

303

print("Recent transaction history:")

304

for commit in history:

305

version = commit.get("version")

306

operation = commit.get("operation", "unknown")

307

timestamp = commit.get("timestamp")

308

309

# Extract app metadata if present

310

operation_parameters = commit.get("operationParameters", {})

311

app_metadata = operation_parameters.get("app_metadata")

312

313

print(f"Version {version}: {operation} at {timestamp}")

314

315

if app_metadata:

316

print(f" App metadata: {app_metadata}")

317

318

# Show operation metrics

319

if "operationMetrics" in commit:

320

metrics = commit["operationMetrics"]

321

for key, value in metrics.items():

322

print(f" {key}: {value}")

323

324

print()

325

326

# Run analysis

327

analyze_transaction_history()

328

```

329

330

### Checkpoint and Log Management

331

332

```python

333

# Manual checkpoint creation

334

dt.create_checkpoint()

335

print("Checkpoint created successfully")

336

337

# Cleanup metadata (removes old log files)

338

dt.cleanup_metadata()

339

print("Old metadata cleaned up")

340

341

# Configure automatic cleanup

342

post_commit_props = PostCommitHookProperties(

343

create_checkpoint=True,

344

cleanup_expired_logs=True

345

)

346

347

# All subsequent operations will use these settings

348

write_deltalake(

349

dt,

350

new_data,

351

mode="append",

352

post_commithook_properties=post_commit_props

353

)

354

```