0
# Transaction Management
1
2
Distributed transaction support with retry logic, consumer state management, and integration with Apache Tephra for ACID guarantees. The transaction management system provides essential capabilities for maintaining data consistency across distributed datasets and operations in the CDAP platform.
3
4
## Capabilities
5
6
### Core Transaction Factory
7
8
The primary interface for creating transaction executors with custom configuration and retry policies.
9
10
```java { .api }
11
public interface TransactionExecutorFactory extends org.apache.tephra.TransactionExecutorFactory {
12
/**
13
* Creates a new transaction executor with dynamic transaction context creation.
14
* This allows for use of the factory with a DynamicDatasetCache.
15
*
16
* @param txContextFactory the TransactionContextFactory for creating new TransactionContext
17
* @return a new instance of TransactionExecutor
18
*/
19
TransactionExecutor createExecutor(TransactionContextFactory txContextFactory);
20
}
21
```
22
23
### Transaction Context Factory
24
25
Factory interface for creating transaction contexts, enabling dynamic dataset cache integration with transaction support.
26
27
```java { .api }
28
public interface TransactionContextFactory {
29
// Creates transaction contexts for dynamic dataset operations
30
TransactionContext create();
31
}
32
```
33
34
### Apache Tephra Integration
35
36
CDAP Data Fabric integrates with Apache Tephra for distributed transaction support. The standard Tephra interfaces are used for core transaction operations:
37
38
```java { .api }
39
// Standard Tephra transaction system client (from org.apache.tephra)
40
public interface TransactionSystemClient {
41
Transaction startShort();
42
Transaction startLong();
43
boolean canCommit(Transaction tx, Collection<byte[]> changeIds);
44
boolean commit(Transaction tx);
45
void abort(Transaction tx);
46
// ... other standard Tephra operations
47
}
48
49
// Transaction-aware interface for participating in transactions
50
public interface TransactionAware {
51
void startTx(Transaction tx);
52
Collection<byte[]> getTxChanges();
53
boolean commitTx() throws Exception;
54
void postTxCommit();
55
boolean rollbackTx() throws Exception;
56
}
57
```
58
59
### Stream Transaction Support
60
61
Transaction-aware stream processing components with consumer state management and coordination.
62
63
```java { .api }
64
// Stream consumer with transaction support
65
public interface StreamConsumer extends Closeable, TransactionAware {
66
// Transaction-aware stream consumption
67
DequeInputDatum poll(long timeout, TimeUnit unit) throws InterruptedException;
68
void consume(int maxEvents, StreamConsumerCallback callback) throws InterruptedException;
69
70
// Consumer positioning and state
71
void seek(StreamEventOffset offset);
72
StreamEventOffset getPosition();
73
74
// Transaction-aware state management
75
@Override
76
void startTx(Transaction tx);
77
@Override
78
Collection<byte[]> getTxChanges();
79
@Override
80
boolean commitTx() throws Exception;
81
@Override
82
void postTxCommit();
83
@Override
84
boolean rollbackTx() throws Exception;
85
}
86
87
// Stream consumer factory with transaction integration
88
public interface StreamConsumerFactory {
89
StreamConsumer create(StreamId streamId, String namespace, ConsumerConfig config);
90
StreamConsumer create(StreamId streamId, String namespace, ConsumerConfig config,
91
StreamConsumerState startState);
92
}
93
94
// Consumer state management for transaction coordination
95
public interface ConsumerState<T> {
96
T getState();
97
void setState(T state);
98
long getTimestamp();
99
}
100
101
// Consumer state store with transaction support
102
public interface ConsumerStateStore<S, T> extends TransactionAware {
103
void configureState(S state, T initialState);
104
ConsumerState<T> getState(S state);
105
void saveState(S state, T stateValue, long timestamp);
106
void removeState(S state);
107
}
108
```
109
110
### Stream Administration
111
112
Stream administration operations with transaction coordination support.
113
114
```java { .api }
115
public interface StreamAdmin {
116
// Stream lifecycle management
117
void create(StreamId streamId) throws Exception;
118
void create(StreamId streamId, Map<String, String> properties) throws Exception;
119
void drop(StreamId streamId) throws Exception;
120
void truncate(StreamId streamId) throws Exception;
121
122
// Stream configuration
123
void updateConfig(StreamId streamId, StreamProperties properties) throws Exception;
124
StreamProperties getConfig(StreamId streamId) throws Exception;
125
126
// Stream metadata and statistics
127
StreamSpecification getSpecification(StreamId streamId) throws Exception;
128
List<StreamSpecification> listStreams(NamespaceId namespaceId) throws Exception;
129
130
// Transaction coordination
131
void upgrade() throws Exception;
132
boolean exists(StreamId streamId) throws Exception;
133
}
134
```
135
136
## Usage Examples
137
138
### Basic Transaction Operations
139
140
```java
141
// Access transaction system client (typically injected)
142
TransactionSystemClient txClient = // ... obtain instance
143
144
// Start a short transaction
145
Transaction tx = txClient.startShort();
146
try {
147
// Perform transactional operations
148
performDatasetOperations(tx);
149
150
// Check if transaction can be committed
151
Collection<byte[]> changeIds = getChangeIds();
152
if (txClient.canCommit(tx, changeIds)) {
153
// Commit the transaction
154
boolean committed = txClient.commit(tx);
155
if (committed) {
156
System.out.println("Transaction committed successfully: " + tx.getTransactionId());
157
}
158
}
159
} catch (Exception e) {
160
// Abort transaction on error
161
txClient.abort(tx);
162
System.out.println("Transaction aborted: " + tx.getTransactionId() + ", error: " + e.getMessage());
163
}
164
165
// Start a long transaction with timeout
166
Transaction longTx = txClient.startLong();
167
try {
168
performLongRunningOperation(longTx);
169
txClient.commitOrThrow(longTx);
170
} catch (TransactionFailureException e) {
171
System.out.println("Long transaction failed: " + e.getMessage());
172
txClient.abort(longTx);
173
}
174
```
175
176
### Transaction Executor Usage
177
178
```java
179
// Create transaction executor with dataset instances
180
public void executeTransactionalOperation(TransactionExecutorFactory txFactory,
181
KeyValueTable dataset1,
182
KeyValueTable dataset2) {
183
184
// Create executor with transaction-aware datasets
185
TransactionExecutor executor = txFactory.createExecutor(Arrays.asList(dataset1, dataset2));
186
187
// Execute transactional operation
188
executor.execute(new TransactionExecutor.Subroutine() {
189
@Override
190
public void apply() throws Exception {
191
// All operations within this block are transactional
192
byte[] key = Bytes.toBytes("user123");
193
194
// Read from first dataset
195
byte[] userData = dataset1.read(key);
196
197
if (userData != null) {
198
// Process data and write to second dataset
199
byte[] processedData = processUserData(userData);
200
dataset2.write(key, processedData);
201
202
// Update original dataset
203
dataset1.write(key, updatedUserData(userData));
204
}
205
206
System.out.println("Transactional operation completed for key: " + Bytes.toString(key));
207
}
208
});
209
}
210
211
// Execute with custom transaction configuration
212
TransactionExecutor.Configuration config = TransactionExecutor.Configuration.builder()
213
.setTimeout(30, TimeUnit.SECONDS)
214
.setMaxRetries(3)
215
.build();
216
217
TransactionExecutor customExecutor = txFactory.createExecutor(datasets, config);
218
customExecutor.execute(() -> {
219
// Custom transaction logic with specific timeout and retry settings
220
});
221
```
222
223
### Retrying Transaction Client
224
225
```java
226
// Create retrying transaction client for robust operations
227
RetryingLongTransactionSystemClient retryingClient =
228
RetryingLongTransactionSystemClient.builder()
229
.setDelegate(originalTxClient)
230
.setMaxRetries(5)
231
.setRetryDelay(1000) // 1 second
232
.setRetryStrategy(RetryStrategy.EXPONENTIAL_BACKOFF)
233
.build();
234
235
// Use retrying client for critical operations
236
public void performCriticalTransactionalOperation() {
237
Transaction tx = retryingClient.startLong();
238
239
try {
240
// Critical data operations that must succeed
241
performCriticalDataMigration(tx);
242
performCriticalIndexUpdate(tx);
243
244
// The retrying client will automatically retry on transient failures
245
retryingClient.commitOrThrow(tx);
246
System.out.println("Critical operation completed successfully");
247
248
} catch (TransactionFailureException e) {
249
// Even with retries, the operation failed
250
System.err.println("Critical operation failed after retries: " + e.getMessage());
251
retryingClient.abort(tx);
252
throw new RuntimeException("Critical operation could not be completed", e);
253
}
254
}
255
```
256
257
### Stream Transaction Integration
258
259
```java
260
// Transaction-aware stream consumer
261
public class TransactionalStreamProcessor {
262
private final StreamConsumer streamConsumer;
263
private final TransactionExecutorFactory txFactory;
264
private final KeyValueTable outputDataset;
265
266
public TransactionalStreamProcessor(StreamConsumer consumer,
267
TransactionExecutorFactory txFactory,
268
KeyValueTable outputDataset) {
269
this.streamConsumer = consumer;
270
this.txFactory = txFactory;
271
this.outputDataset = outputDataset;
272
}
273
274
public void processStreamTransactionally() {
275
// Create executor with both stream consumer and output dataset
276
TransactionExecutor executor = txFactory.createExecutor(
277
Arrays.asList(streamConsumer, outputDataset));
278
279
executor.execute(new TransactionExecutor.Subroutine() {
280
@Override
281
public void apply() throws Exception {
282
// Consume events from stream within transaction
283
streamConsumer.consume(100, new StreamConsumerCallback() {
284
@Override
285
public void onEvent(DequeInputDatum event, DequeInputDatum eventMetadata) throws Exception {
286
// Process stream event
287
byte[] processedData = processStreamEvent(event);
288
289
// Write processed data to dataset within same transaction
290
String key = extractKey(event);
291
outputDataset.write(Bytes.toBytes(key), processedData);
292
}
293
294
@Override
295
public void onFinish() throws Exception {
296
System.out.println("Batch processing completed");
297
}
298
299
@Override
300
public void onError(Exception error) throws Exception {
301
System.err.println("Error processing stream events: " + error.getMessage());
302
throw error; // This will cause transaction rollback
303
}
304
});
305
}
306
});
307
}
308
}
309
310
// Stream consumer state management
311
public void manageStreamConsumerState(ConsumerStateStore<String, StreamEventOffset> stateStore,
312
TransactionExecutorFactory txFactory) {
313
314
String consumerId = "analytics-processor";
315
StreamEventOffset initialOffset = new StreamEventOffset(0, 0);
316
317
// Configure initial state
318
TransactionExecutor executor = txFactory.createExecutor(Arrays.asList(stateStore));
319
executor.execute(() -> {
320
stateStore.configureState(consumerId, initialOffset);
321
});
322
323
// Process with state updates
324
executor.execute(() -> {
325
ConsumerState<StreamEventOffset> currentState = stateStore.getState(consumerId);
326
StreamEventOffset currentOffset = currentState.getState();
327
328
// Process events and update state
329
StreamEventOffset newOffset = processEventsFromOffset(currentOffset);
330
stateStore.saveState(consumerId, newOffset, System.currentTimeMillis());
331
332
System.out.println("Updated consumer state from " + currentOffset + " to " + newOffset);
333
});
334
}
335
```
336
337
### Advanced Transaction Patterns
338
339
```java
340
// Nested transaction pattern for complex operations
341
public class NestedTransactionManager {
342
private final TransactionExecutorFactory txFactory;
343
344
public NestedTransactionManager(TransactionExecutorFactory txFactory) {
345
this.txFactory = txFactory;
346
}
347
348
public void performNestedTransactionalOperations(List<KeyValueTable> datasets) {
349
// Outer transaction for overall consistency
350
TransactionExecutor outerExecutor = txFactory.createExecutor(datasets);
351
352
outerExecutor.execute(() -> {
353
// Perform outer transaction operations
354
performOuterTransactionLogic(datasets);
355
356
// Inner operations that might need their own transaction semantics
357
for (KeyValueTable dataset : datasets) {
358
performDatasetSpecificOperations(dataset);
359
}
360
361
// Final consistency check
362
validateTransactionalConsistency(datasets);
363
});
364
}
365
366
private void performDatasetSpecificOperations(KeyValueTable dataset) {
367
// Dataset-specific operations within the outer transaction
368
try {
369
// Batch operations on single dataset
370
performBatchOperations(dataset);
371
} catch (Exception e) {
372
System.err.println("Dataset operation failed: " + e.getMessage());
373
throw new RuntimeException("Dataset operation failed", e);
374
}
375
}
376
}
377
378
// Transaction checkpoint pattern for long-running operations
379
public void performLongRunningTransactionalOperation(TransactionSystemClient txClient) {
380
Transaction tx = txClient.startLong();
381
382
try {
383
// Phase 1
384
performOperationPhase1(tx);
385
386
// Checkpoint the transaction
387
Transaction checkpointTx = txClient.checkpoint(tx);
388
System.out.println("Transaction checkpointed: " + checkpointTx.getTransactionId());
389
390
// Phase 2 using checkpointed transaction
391
performOperationPhase2(checkpointTx);
392
393
// Phase 3
394
performOperationPhase3(checkpointTx);
395
396
// Final commit
397
txClient.commitOrThrow(checkpointTx);
398
System.out.println("Long-running transaction completed successfully");
399
400
} catch (Exception e) {
401
txClient.abort(tx);
402
System.err.println("Long-running transaction failed: " + e.getMessage());
403
throw new RuntimeException("Long-running operation failed", e);
404
}
405
}
406
```
407
408
### Transaction Monitoring and Diagnostics
409
410
```java
411
// Transaction monitoring utility
412
public class TransactionMonitor {
413
private final TransactionSystemClient txClient;
414
415
public TransactionMonitor(TransactionSystemClient txClient) {
416
this.txClient = txClient;
417
}
418
419
public void monitorTransactionSystem() {
420
// Get transaction system status
421
String status = txClient.getStatus();
422
System.out.println("Transaction System Status: " + status);
423
424
// Monitor transaction metrics
425
printTransactionMetrics();
426
}
427
428
public void handleTransactionSystemRecovery() {
429
try {
430
// Reset transaction system state if needed
431
System.out.println("Resetting transaction system state...");
432
txClient.resetState();
433
System.out.println("Transaction system state reset completed");
434
435
} catch (Exception e) {
436
System.err.println("Failed to reset transaction system: " + e.getMessage());
437
}
438
}
439
440
public void createTransactionSnapshot() {
441
try {
442
InputStream snapshotStream = txClient.getSnapshotInputStream();
443
444
// Save snapshot for backup/recovery
445
saveSnapshotToFile(snapshotStream, "tx-snapshot-" + System.currentTimeMillis());
446
System.out.println("Transaction snapshot created successfully");
447
448
} catch (TransactionCouldNotTakeSnapshotException e) {
449
System.err.println("Could not create transaction snapshot: " + e.getMessage());
450
}
451
}
452
}
453
```
454
455
## Types
456
457
```java { .api }
458
// Core transaction types from Apache Tephra extended for CDAP
459
public interface Transaction extends org.apache.tephra.Transaction {
460
// Transaction identification and state
461
long getTransactionId();
462
long getVisibilityUpperBound();
463
Set<Long> getInvalids();
464
Set<Long> getInProgress();
465
466
// Transaction type and timeouts
467
TransactionType getType();
468
long getTimeout();
469
}
470
471
// Transaction executor configuration
472
public static class Configuration {
473
public static Builder builder();
474
475
public int getMaxRetries();
476
public long getTimeoutMillis();
477
public TimeUnit getTimeoutUnit();
478
479
public static class Builder {
480
public Builder setTimeout(long timeout, TimeUnit unit);
481
public Builder setMaxRetries(int maxRetries);
482
public Configuration build();
483
}
484
}
485
486
// Transaction-aware interface for CDAP components
487
public interface TransactionAware extends org.apache.tephra.TransactionAware {
488
@Override
489
void startTx(Transaction tx);
490
491
@Override
492
Collection<byte[]> getTxChanges();
493
494
@Override
495
boolean commitTx() throws Exception;
496
497
@Override
498
void postTxCommit();
499
500
@Override
501
boolean rollbackTx() throws Exception;
502
503
@Override
504
String getTransactionAwareName();
505
}
506
507
// Stream event and consumer types
508
public interface DequeInputDatum {
509
byte[] getData();
510
Map<String, String> getHeaders();
511
long getTimestamp();
512
}
513
514
public interface StreamConsumerCallback {
515
void onEvent(DequeInputDatum event, DequeInputDatum eventMetadata) throws Exception;
516
void onFinish() throws Exception;
517
void onError(Exception error) throws Exception;
518
}
519
520
public final class StreamEventOffset {
521
public long getGeneration();
522
public long getOffset();
523
524
public StreamEventOffset(long generation, long offset);
525
}
526
527
// Consumer configuration
528
public final class ConsumerConfig {
529
public static Builder builder();
530
531
public int getDequeueTimeout();
532
public int getMaxDequeueSize();
533
534
public static class Builder {
535
public Builder setDequeueTimeout(int timeout);
536
public Builder setMaxDequeueSize(int size);
537
public ConsumerConfig build();
538
}
539
}
540
541
// Retry strategy enumeration
542
public enum RetryStrategy {
543
FIXED_DELAY,
544
LINEAR_BACKOFF,
545
EXPONENTIAL_BACKOFF,
546
CUSTOM
547
}
548
549
// Exception types
550
public class TransactionFailureException extends Exception {
551
public TransactionFailureException(String message);
552
public TransactionFailureException(String message, Throwable cause);
553
}
554
555
public class TransactionNotInProgressException extends TransactionFailureException {
556
public TransactionNotInProgressException(String message);
557
}
558
559
public class TransactionCouldNotTakeSnapshotException extends Exception {
560
public TransactionCouldNotTakeSnapshotException(String message);
561
public TransactionCouldNotTakeSnapshotException(String message, Throwable cause);
562
}
563
```