or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

backend-management.mdconfiguration-key-groups.mdindex.mdkey-state-management.mdserialization-framework.mdstate-types-operations.mdtransaction-management.md

transaction-management.mddocs/

0

# Transaction Management

1

2

Comprehensive transaction system implementing four-phase commit protocol with finish, commit, ackCommit, and rollback operations for ensuring data consistency during failures and supporting checkpoint-based recovery.

3

4

## Capabilities

5

6

### State Store Manager Interface

7

8

Core transaction interface defining the four-phase commit protocol for state management with checkpoint-based recovery support.

9

10

```java { .api }

11

/**

12

* Transaction state interface supporting four-phase commit protocol

13

*/

14

public interface StateStoreManager {

15

/**

16

* Finish phase - Complete batch data saving and serialization

17

* This is typically where serialization work is performed

18

* @param checkpointId Checkpoint identifier for this transaction

19

*/

20

void finish(long checkpointId);

21

22

/**

23

* Commit phase - Persist data to storage (can be async)

24

* This is typically where data persistence is performed

25

* @param checkpointId Checkpoint identifier for this transaction

26

*/

27

void commit(long checkpointId);

28

29

/**

30

* Acknowledge commit phase - Clean up after successful commit

31

* Must be called after commit in the same thread

32

* @param checkpointId Checkpoint identifier for this transaction

33

* @param timeStamp Timestamp of the acknowledgment

34

*/

35

void ackCommit(long checkpointId, long timeStamp);

36

37

/**

38

* Rollback phase - Recover from checkpoint on failure

39

* @param checkpointId Checkpoint identifier to rollback to

40

*/

41

void rollBack(long checkpointId);

42

}

43

```

44

45

**Usage Examples:**

46

47

```java

48

// Example transaction flow for successful checkpoint

49

StateStoreManager stateManager = keyStateBackend; // KeyStateBackend implements StateStoreManager

50

51

long checkpointId = 1001L;

52

53

try {

54

// Phase 1: Finish - serialize and prepare data

55

stateManager.finish(checkpointId);

56

57

// Phase 2: Commit - persist data (can be in separate thread)

58

stateManager.commit(checkpointId);

59

60

// Phase 3: Acknowledge - cleanup after successful commit

61

long timestamp = System.currentTimeMillis();

62

stateManager.ackCommit(checkpointId, timestamp);

63

64

System.out.println("Checkpoint " + checkpointId + " completed successfully");

65

66

} catch (Exception e) {

67

// Phase 4: Rollback - recover on failure

68

stateManager.rollBack(checkpointId);

69

System.err.println("Checkpoint " + checkpointId + " failed, rolled back");

70

}

71

```

72

73

### Abstract Key State Backend Transaction Support

74

75

Base implementation providing transaction operations and context management for key-based state backends.

76

77

```java { .api }

78

/**

79

* Base class providing transaction support and state management

80

*/

81

public abstract class AbstractKeyStateBackend implements StateStoreManager {

82

/**

83

* Finish checkpoint - complete batch data saving and serialization

84

* @param checkpointId Checkpoint identifier

85

*/

86

public void finish(long checkpointId);

87

88

/**

89

* Commit checkpoint - persist data (can be async)

90

* @param checkpointId Checkpoint identifier

91

*/

92

public void commit(long checkpointId);

93

94

/**

95

* Acknowledge commit - clean up after commit

96

* @param checkpointId Checkpoint identifier

97

* @param timeStamp Timestamp of acknowledgment

98

*/

99

public void ackCommit(long checkpointId, long timeStamp);

100

101

/**

102

* Rollback checkpoint - recover from checkpoint

103

* @param checkpointId Checkpoint identifier

104

*/

105

public void rollBack(long checkpointId);

106

107

/**

108

* Get current checkpoint ID

109

* @return Current checkpoint ID

110

*/

111

public long getCheckpointId();

112

113

/**

114

* Set checkpoint ID for transaction context

115

* @param checkpointId Checkpoint ID to set

116

*/

117

public void setCheckpointId(long checkpointId);

118

119

/**

120

* Set complete processing context

121

* @param checkpointId Checkpoint identifier

122

* @param currentKey Current processing key

123

*/

124

public void setContext(long checkpointId, Object currentKey);

125

}

126

```

127

128

### State Store Manager Proxy

129

130

Abstract proxy class supporting transaction state operations with strategy delegation for different storage backends.

131

132

```java { .api }

133

/**

134

* Proxy supporting transaction state operations with strategy delegation

135

*/

136

public abstract class StateStoreManagerProxy<V> implements StateStoreManager {

137

/**

138

* Create state store manager proxy

139

* @param keyStateBackend Backend providing transaction support

140

* @param stateDescriptor Descriptor defining the state

141

*/

142

public StateStoreManagerProxy(AbstractKeyStateBackend keyStateBackend, AbstractStateDescriptor stateDescriptor);

143

144

/**

145

* Finish checkpoint phase

146

* @param checkpointId Checkpoint identifier

147

*/

148

public void finish(long checkpointId);

149

150

/**

151

* Commit checkpoint phase (can be async)

152

* @param checkpointId Checkpoint identifier

153

*/

154

public void commit(long checkpointId);

155

156

/**

157

* Acknowledge commit phase

158

* @param checkpointId Checkpoint identifier

159

* @param timeStamp Timestamp of acknowledgment

160

*/

161

public void ackCommit(long checkpointId, long timeStamp);

162

163

/**

164

* Rollback checkpoint phase

165

* @param checkpointId Checkpoint identifier

166

*/

167

public void rollBack(long checkpointId);

168

169

/**

170

* Close proxy and cleanup resources

171

*/

172

public void close();

173

174

/**

175

* Get value by key

176

* @param key State key

177

* @return Retrieved value

178

*/

179

public V get(String key);

180

181

/**

182

* Put value by key

183

* @param key State key

184

* @param value Value to store

185

*/

186

public void put(String key, V value);

187

}

188

```

189

190

### Abstract State Store Manager

191

192

Abstract base class for state store managers implementing three-layer storage architecture (front, middle, remote) with serialization support.

193

194

```java { .api }

195

/**

196

* Abstract base for state store managers with three-layer storage

197

*/

198

public abstract class AbstractStateStoreManager<V> {

199

/**

200

* Create state store manager with backend storage

201

* @param backStore Backend key-value store for persistence

202

*/

203

public AbstractStateStoreManager(KeyValueStore<String, Map<Long, byte[]>> backStore);

204

205

/**

206

* Convert storage record to bytes for serialization

207

* @param storageRecord Record to serialize

208

* @return Serialized bytes

209

*/

210

public byte[] toBytes(StorageRecord storageRecord);

211

212

/**

213

* Convert bytes back to storage record

214

* @param data Serialized bytes

215

* @return Deserialized storage record

216

*/

217

public StorageRecord<V> toStorageRecord(byte[] data);

218

219

/**

220

* Get value for checkpoint and key (abstract)

221

* @param checkpointId Checkpoint identifier

222

* @param key State key

223

* @return Retrieved value

224

*/

225

public abstract V get(long checkpointId, String key);

226

227

/**

228

* Put value for checkpoint and key

229

* @param checkpointId Checkpoint identifier

230

* @param k State key

231

* @param v Value to store

232

*/

233

public void put(long checkpointId, String k, V v);

234

235

/**

236

* Acknowledge commit with timestamp

237

* @param checkpointId Checkpoint identifier

238

* @param timeStamp Acknowledgment timestamp

239

*/

240

public void ackCommit(long checkpointId, long timeStamp);

241

242

/**

243

* Acknowledge commit (abstract)

244

* @param checkpointId Checkpoint identifier

245

*/

246

public abstract void ackCommit(long checkpointId);

247

248

/**

249

* Set key group index for partitioning

250

* @param keyGroupIndex Key group index

251

*/

252

public void setKeyGroupIndex(int keyGroupIndex);

253

254

/**

255

* Close and cleanup resources

256

*/

257

public void close();

258

}

259

```

260

261

### Storage Strategy Implementations

262

263

The library provides two main storage strategies for different consistency and performance requirements.

264

265

#### Dual Version State Store Manager

266

267

```java { .api }

268

/**

269

* Dual-version state store manager supporting rollback

270

*/

271

public class DualStateStoreManager<V> extends AbstractStateStoreManager<V> {

272

// Maintains two versions of data for rollback support

273

// Implements DUAL_VERSION strategy

274

}

275

```

276

277

#### Multi-Version State Store Manager

278

279

```java { .api }

280

/**

281

* Multi-version state store manager for MVCC scenarios

282

*/

283

public class MVStateStoreManager<V> extends AbstractStateStoreManager<V> {

284

// Optimized for MVCC storage systems

285

// Implements SINGLE_VERSION strategy

286

}

287

```

288

289

**Transaction Flow Examples:**

290

291

```java

292

// Example 1: Basic transaction with error handling

293

KeyStateBackend backend = new KeyStateBackend(numberOfKeyGroups, keyGroup, stateBackend);

294

295

// Set up state

296

ValueStateDescriptor<String> desc = ValueStateDescriptor.build("test-state", String.class, "");

297

ValueState<String> state = backend.getValueState(desc);

298

299

backend.setCurrentKey("key1");

300

state.update("value1");

301

302

// Perform transaction

303

long checkpointId = System.currentTimeMillis();

304

try {

305

backend.setCheckpointId(checkpointId);

306

backend.finish(checkpointId);

307

backend.commit(checkpointId);

308

backend.ackCommit(checkpointId, System.currentTimeMillis());

309

System.out.println("Transaction completed successfully");

310

} catch (Exception e) {

311

backend.rollBack(checkpointId);

312

System.err.println("Transaction failed, rolled back: " + e.getMessage());

313

}

314

315

// Example 2: Async commit pattern

316

ExecutorService executor = Executors.newSingleThreadExecutor();

317

318

backend.finish(checkpointId);

319

320

// Commit can be performed asynchronously

321

CompletableFuture<Void> commitFuture = CompletableFuture.runAsync(() -> {

322

try {

323

backend.commit(checkpointId);

324

} catch (Exception e) {

325

throw new RuntimeException("Commit failed", e);

326

}

327

}, executor);

328

329

commitFuture.whenComplete((result, throwable) -> {

330

if (throwable == null) {

331

// Acknowledge must be in same thread as commit

332

backend.ackCommit(checkpointId, System.currentTimeMillis());

333

System.out.println("Async commit completed");

334

} else {

335

backend.rollBack(checkpointId);

336

System.err.println("Async commit failed: " + throwable.getMessage());

337

}

338

});

339

340

// Example 3: Multiple state operations in single transaction

341

backend.setCurrentKey("user123");

342

long txnId = 2001L;

343

344

// Modify multiple states

345

ValueState<String> nameState = backend.getValueState(nameDesc);

346

ListState<String> eventState = backend.getListState(eventDesc);

347

MapState<String, Integer> counterState = backend.getMapState(counterDesc);

348

349

nameState.update("Alice");

350

eventState.add("login");

351

counterState.put("sessions", 1);

352

353

// Commit all changes atomically

354

try {

355

backend.setCheckpointId(txnId);

356

backend.finish(txnId);

357

backend.commit(txnId);

358

backend.ackCommit(txnId, System.currentTimeMillis());

359

360

System.out.println("All state changes committed atomically");

361

} catch (Exception e) {

362

backend.rollBack(txnId);

363

System.err.println("All state changes rolled back due to failure");

364

}

365

```

366

367

### Error Handling and Recovery

368

369

The transaction system provides comprehensive error handling and recovery mechanisms:

370

371

```java

372

// Custom exception handling with detailed recovery

373

public class StreamingStateTransactionManager {

374

private final KeyStateBackend backend;

375

private final Map<Long, TransactionState> activeTransactions = new ConcurrentHashMap<>();

376

377

public void performCheckpoint(long checkpointId) {

378

TransactionState txnState = new TransactionState(checkpointId);

379

activeTransactions.put(checkpointId, txnState);

380

381

try {

382

// Phase 1: Finish

383

txnState.setPhase("FINISH");

384

backend.finish(checkpointId);

385

386

// Phase 2: Commit

387

txnState.setPhase("COMMIT");

388

backend.commit(checkpointId);

389

390

// Phase 3: Acknowledge

391

txnState.setPhase("ACK_COMMIT");

392

backend.ackCommit(checkpointId, System.currentTimeMillis());

393

394

txnState.setPhase("COMPLETED");

395

System.out.println("Transaction " + checkpointId + " completed successfully");

396

397

} catch (Exception e) {

398

handleTransactionFailure(checkpointId, txnState, e);

399

} finally {

400

activeTransactions.remove(checkpointId);

401

}

402

}

403

404

private void handleTransactionFailure(long checkpointId, TransactionState txnState, Exception error) {

405

System.err.println("Transaction " + checkpointId + " failed in phase " + txnState.getPhase() + ": " + error.getMessage());

406

407

try {

408

backend.rollBack(checkpointId);

409

System.out.println("Successfully rolled back transaction " + checkpointId);

410

} catch (Exception rollbackError) {

411

System.err.println("CRITICAL: Rollback failed for transaction " + checkpointId + ": " + rollbackError.getMessage());

412

// Additional recovery logic would go here

413

}

414

}

415

416

private static class TransactionState {

417

private final long checkpointId;

418

private String phase;

419

private final long startTime;

420

421

public TransactionState(long checkpointId) {

422

this.checkpointId = checkpointId;

423

this.startTime = System.currentTimeMillis();

424

this.phase = "INIT";

425

}

426

427

public void setPhase(String phase) { this.phase = phase; }

428

public String getPhase() { return phase; }

429

}

430

}

431

```