0
# Transaction Management
1
2
Comprehensive transaction system implementing four-phase commit protocol with finish, commit, ackCommit, and rollback operations for ensuring data consistency during failures and supporting checkpoint-based recovery.
3
4
## Capabilities
5
6
### State Store Manager Interface
7
8
Core transaction interface defining the four-phase commit protocol for state management with checkpoint-based recovery support.
9
10
```java { .api }
11
/**
12
* Transaction state interface supporting four-phase commit protocol
13
*/
14
public interface StateStoreManager {
15
/**
16
* Finish phase - Complete batch data saving and serialization
17
* This is typically where serialization work is performed
18
* @param checkpointId Checkpoint identifier for this transaction
19
*/
20
void finish(long checkpointId);
21
22
/**
23
* Commit phase - Persist data to storage (can be async)
24
* This is typically where data persistence is performed
25
* @param checkpointId Checkpoint identifier for this transaction
26
*/
27
void commit(long checkpointId);
28
29
/**
30
* Acknowledge commit phase - Clean up after successful commit
31
* Must be called after commit in the same thread
32
* @param checkpointId Checkpoint identifier for this transaction
33
* @param timeStamp Timestamp of the acknowledgment
34
*/
35
void ackCommit(long checkpointId, long timeStamp);
36
37
/**
38
* Rollback phase - Recover from checkpoint on failure
39
* @param checkpointId Checkpoint identifier to rollback to
40
*/
41
void rollBack(long checkpointId);
42
}
43
```
44
45
**Usage Examples:**
46
47
```java
48
// Example transaction flow for successful checkpoint
49
StateStoreManager stateManager = keyStateBackend; // KeyStateBackend implements StateStoreManager
50
51
long checkpointId = 1001L;
52
53
try {
54
// Phase 1: Finish - serialize and prepare data
55
stateManager.finish(checkpointId);
56
57
// Phase 2: Commit - persist data (can be in separate thread)
58
stateManager.commit(checkpointId);
59
60
// Phase 3: Acknowledge - cleanup after successful commit
61
long timestamp = System.currentTimeMillis();
62
stateManager.ackCommit(checkpointId, timestamp);
63
64
System.out.println("Checkpoint " + checkpointId + " completed successfully");
65
66
} catch (Exception e) {
67
// Phase 4: Rollback - recover on failure
68
stateManager.rollBack(checkpointId);
69
System.err.println("Checkpoint " + checkpointId + " failed, rolled back");
70
}
71
```
72
73
### Abstract Key State Backend Transaction Support
74
75
Base implementation providing transaction operations and context management for key-based state backends.
76
77
```java { .api }
78
/**
79
* Base class providing transaction support and state management
80
*/
81
public abstract class AbstractKeyStateBackend implements StateStoreManager {
82
/**
83
* Finish checkpoint - complete batch data saving and serialization
84
* @param checkpointId Checkpoint identifier
85
*/
86
public void finish(long checkpointId);
87
88
/**
89
* Commit checkpoint - persist data (can be async)
90
* @param checkpointId Checkpoint identifier
91
*/
92
public void commit(long checkpointId);
93
94
/**
95
* Acknowledge commit - clean up after commit
96
* @param checkpointId Checkpoint identifier
97
* @param timeStamp Timestamp of acknowledgment
98
*/
99
public void ackCommit(long checkpointId, long timeStamp);
100
101
/**
102
* Rollback checkpoint - recover from checkpoint
103
* @param checkpointId Checkpoint identifier
104
*/
105
public void rollBack(long checkpointId);
106
107
/**
108
* Get current checkpoint ID
109
* @return Current checkpoint ID
110
*/
111
public long getCheckpointId();
112
113
/**
114
* Set checkpoint ID for transaction context
115
* @param checkpointId Checkpoint ID to set
116
*/
117
public void setCheckpointId(long checkpointId);
118
119
/**
120
* Set complete processing context
121
* @param checkpointId Checkpoint identifier
122
* @param currentKey Current processing key
123
*/
124
public void setContext(long checkpointId, Object currentKey);
125
}
126
```
127
128
### State Store Manager Proxy
129
130
Abstract proxy class supporting transaction state operations with strategy delegation for different storage backends.
131
132
```java { .api }
133
/**
134
* Proxy supporting transaction state operations with strategy delegation
135
*/
136
public abstract class StateStoreManagerProxy<V> implements StateStoreManager {
137
/**
138
* Create state store manager proxy
139
* @param keyStateBackend Backend providing transaction support
140
* @param stateDescriptor Descriptor defining the state
141
*/
142
public StateStoreManagerProxy(AbstractKeyStateBackend keyStateBackend, AbstractStateDescriptor stateDescriptor);
143
144
/**
145
* Finish checkpoint phase
146
* @param checkpointId Checkpoint identifier
147
*/
148
public void finish(long checkpointId);
149
150
/**
151
* Commit checkpoint phase (can be async)
152
* @param checkpointId Checkpoint identifier
153
*/
154
public void commit(long checkpointId);
155
156
/**
157
* Acknowledge commit phase
158
* @param checkpointId Checkpoint identifier
159
* @param timeStamp Timestamp of acknowledgment
160
*/
161
public void ackCommit(long checkpointId, long timeStamp);
162
163
/**
164
* Rollback checkpoint phase
165
* @param checkpointId Checkpoint identifier
166
*/
167
public void rollBack(long checkpointId);
168
169
/**
170
* Close proxy and cleanup resources
171
*/
172
public void close();
173
174
/**
175
* Get value by key
176
* @param key State key
177
* @return Retrieved value
178
*/
179
public V get(String key);
180
181
/**
182
* Put value by key
183
* @param key State key
184
* @param value Value to store
185
*/
186
public void put(String key, V value);
187
}
188
```
189
190
### Abstract State Store Manager
191
192
Abstract base class for state store managers implementing three-layer storage architecture (front, middle, remote) with serialization support.
193
194
```java { .api }
195
/**
196
* Abstract base for state store managers with three-layer storage
197
*/
198
public abstract class AbstractStateStoreManager<V> {
199
/**
200
* Create state store manager with backend storage
201
* @param backStore Backend key-value store for persistence
202
*/
203
public AbstractStateStoreManager(KeyValueStore<String, Map<Long, byte[]>> backStore);
204
205
/**
206
* Convert storage record to bytes for serialization
207
* @param storageRecord Record to serialize
208
* @return Serialized bytes
209
*/
210
public byte[] toBytes(StorageRecord storageRecord);
211
212
/**
213
* Convert bytes back to storage record
214
* @param data Serialized bytes
215
* @return Deserialized storage record
216
*/
217
public StorageRecord<V> toStorageRecord(byte[] data);
218
219
/**
220
* Get value for checkpoint and key (abstract)
221
* @param checkpointId Checkpoint identifier
222
* @param key State key
223
* @return Retrieved value
224
*/
225
public abstract V get(long checkpointId, String key);
226
227
/**
228
* Put value for checkpoint and key
229
* @param checkpointId Checkpoint identifier
230
* @param k State key
231
* @param v Value to store
232
*/
233
public void put(long checkpointId, String k, V v);
234
235
/**
236
* Acknowledge commit with timestamp
237
* @param checkpointId Checkpoint identifier
238
* @param timeStamp Acknowledgment timestamp
239
*/
240
public void ackCommit(long checkpointId, long timeStamp);
241
242
/**
243
* Acknowledge commit (abstract)
244
* @param checkpointId Checkpoint identifier
245
*/
246
public abstract void ackCommit(long checkpointId);
247
248
/**
249
* Set key group index for partitioning
250
* @param keyGroupIndex Key group index
251
*/
252
public void setKeyGroupIndex(int keyGroupIndex);
253
254
/**
255
* Close and cleanup resources
256
*/
257
public void close();
258
}
259
```
260
261
### Storage Strategy Implementations
262
263
The library provides two main storage strategies for different consistency and performance requirements.
264
265
#### Dual Version State Store Manager
266
267
```java { .api }
268
/**
269
* Dual-version state store manager supporting rollback
270
*/
271
public class DualStateStoreManager<V> extends AbstractStateStoreManager<V> {
272
// Maintains two versions of data for rollback support
273
// Implements DUAL_VERSION strategy
274
}
275
```
276
277
#### Multi-Version State Store Manager
278
279
```java { .api }
280
/**
281
* Multi-version state store manager for MVCC scenarios
282
*/
283
public class MVStateStoreManager<V> extends AbstractStateStoreManager<V> {
284
// Optimized for MVCC storage systems
285
// Implements SINGLE_VERSION strategy
286
}
287
```
288
289
**Transaction Flow Examples:**
290
291
```java
292
// Example 1: Basic transaction with error handling
293
KeyStateBackend backend = new KeyStateBackend(numberOfKeyGroups, keyGroup, stateBackend);
294
295
// Set up state
296
ValueStateDescriptor<String> desc = ValueStateDescriptor.build("test-state", String.class, "");
297
ValueState<String> state = backend.getValueState(desc);
298
299
backend.setCurrentKey("key1");
300
state.update("value1");
301
302
// Perform transaction
303
long checkpointId = System.currentTimeMillis();
304
try {
305
backend.setCheckpointId(checkpointId);
306
backend.finish(checkpointId);
307
backend.commit(checkpointId);
308
backend.ackCommit(checkpointId, System.currentTimeMillis());
309
System.out.println("Transaction completed successfully");
310
} catch (Exception e) {
311
backend.rollBack(checkpointId);
312
System.err.println("Transaction failed, rolled back: " + e.getMessage());
313
}
314
315
// Example 2: Async commit pattern
316
ExecutorService executor = Executors.newSingleThreadExecutor();
317
318
backend.finish(checkpointId);
319
320
// Commit can be performed asynchronously
321
CompletableFuture<Void> commitFuture = CompletableFuture.runAsync(() -> {
322
try {
323
backend.commit(checkpointId);
324
} catch (Exception e) {
325
throw new RuntimeException("Commit failed", e);
326
}
327
}, executor);
328
329
commitFuture.whenComplete((result, throwable) -> {
330
if (throwable == null) {
331
// Acknowledge must be in same thread as commit
332
backend.ackCommit(checkpointId, System.currentTimeMillis());
333
System.out.println("Async commit completed");
334
} else {
335
backend.rollBack(checkpointId);
336
System.err.println("Async commit failed: " + throwable.getMessage());
337
}
338
});
339
340
// Example 3: Multiple state operations in single transaction
341
backend.setCurrentKey("user123");
342
long txnId = 2001L;
343
344
// Modify multiple states
345
ValueState<String> nameState = backend.getValueState(nameDesc);
346
ListState<String> eventState = backend.getListState(eventDesc);
347
MapState<String, Integer> counterState = backend.getMapState(counterDesc);
348
349
nameState.update("Alice");
350
eventState.add("login");
351
counterState.put("sessions", 1);
352
353
// Commit all changes atomically
354
try {
355
backend.setCheckpointId(txnId);
356
backend.finish(txnId);
357
backend.commit(txnId);
358
backend.ackCommit(txnId, System.currentTimeMillis());
359
360
System.out.println("All state changes committed atomically");
361
} catch (Exception e) {
362
backend.rollBack(txnId);
363
System.err.println("All state changes rolled back due to failure");
364
}
365
```
366
367
### Error Handling and Recovery
368
369
The transaction system provides comprehensive error handling and recovery mechanisms:
370
371
```java
372
// Custom exception handling with detailed recovery
373
public class StreamingStateTransactionManager {
374
private final KeyStateBackend backend;
375
private final Map<Long, TransactionState> activeTransactions = new ConcurrentHashMap<>();
376
377
public void performCheckpoint(long checkpointId) {
378
TransactionState txnState = new TransactionState(checkpointId);
379
activeTransactions.put(checkpointId, txnState);
380
381
try {
382
// Phase 1: Finish
383
txnState.setPhase("FINISH");
384
backend.finish(checkpointId);
385
386
// Phase 2: Commit
387
txnState.setPhase("COMMIT");
388
backend.commit(checkpointId);
389
390
// Phase 3: Acknowledge
391
txnState.setPhase("ACK_COMMIT");
392
backend.ackCommit(checkpointId, System.currentTimeMillis());
393
394
txnState.setPhase("COMPLETED");
395
System.out.println("Transaction " + checkpointId + " completed successfully");
396
397
} catch (Exception e) {
398
handleTransactionFailure(checkpointId, txnState, e);
399
} finally {
400
activeTransactions.remove(checkpointId);
401
}
402
}
403
404
private void handleTransactionFailure(long checkpointId, TransactionState txnState, Exception error) {
405
System.err.println("Transaction " + checkpointId + " failed in phase " + txnState.getPhase() + ": " + error.getMessage());
406
407
try {
408
backend.rollBack(checkpointId);
409
System.out.println("Successfully rolled back transaction " + checkpointId);
410
} catch (Exception rollbackError) {
411
System.err.println("CRITICAL: Rollback failed for transaction " + checkpointId + ": " + rollbackError.getMessage());
412
// Additional recovery logic would go here
413
}
414
}
415
416
private static class TransactionState {
417
private final long checkpointId;
418
private String phase;
419
private final long startTime;
420
421
public TransactionState(long checkpointId) {
422
this.checkpointId = checkpointId;
423
this.startTime = System.currentTimeMillis();
424
this.phase = "INIT";
425
}
426
427
public void setPhase(String phase) { this.phase = phase; }
428
public String getPhase() { return phase; }
429
}
430
}
431
```