or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

audit-compliance.mddataset-management.mdindex.mdmetadata-management.mdnamespace-management.mdstream-processing.mdtransaction-management.mdusage-registry.md

transaction-management.mddocs/

0

# Transaction Management

1

2

Distributed transaction support with retry logic, consumer state management, and integration with Apache Tephra for ACID guarantees. The transaction management system provides essential capabilities for maintaining data consistency across distributed datasets and operations in the CDAP platform.

3

4

## Capabilities

5

6

### Core Transaction Factory

7

8

The primary interface for creating transaction executors with custom configuration and retry policies.

9

10

```java { .api }

11

public interface TransactionExecutorFactory extends org.apache.tephra.TransactionExecutorFactory {

12

/**

13

* Creates a new transaction executor with dynamic transaction context creation.

14

* This allows for use of the factory with a DynamicDatasetCache.

15

*

16

* @param txContextFactory the TransactionContextFactory for creating new TransactionContext

17

* @return a new instance of TransactionExecutor

18

*/

19

TransactionExecutor createExecutor(TransactionContextFactory txContextFactory);

20

}

21

```

22

23

### Transaction Context Factory

24

25

Factory interface for creating transaction contexts, enabling dynamic dataset cache integration with transaction support.

26

27

```java { .api }

28

public interface TransactionContextFactory {

29

// Creates transaction contexts for dynamic dataset operations

30

TransactionContext create();

31

}

32

```

33

34

### Apache Tephra Integration

35

36

CDAP Data Fabric integrates with Apache Tephra for distributed transaction support. The standard Tephra interfaces are used for core transaction operations:

37

38

```java { .api }

39

// Standard Tephra transaction system client (from org.apache.tephra)

40

public interface TransactionSystemClient {

41

Transaction startShort();

42

Transaction startLong();

43

boolean canCommit(Transaction tx, Collection<byte[]> changeIds);

44

boolean commit(Transaction tx);

45

void abort(Transaction tx);

46

// ... other standard Tephra operations

47

}

48

49

// Transaction-aware interface for participating in transactions

50

public interface TransactionAware {

51

void startTx(Transaction tx);

52

Collection<byte[]> getTxChanges();

53

boolean commitTx() throws Exception;

54

void postTxCommit();

55

boolean rollbackTx() throws Exception;

56

}

57

```

58

59

### Stream Transaction Support

60

61

Transaction-aware stream processing components with consumer state management and coordination.

62

63

```java { .api }

64

// Stream consumer with transaction support

65

public interface StreamConsumer extends Closeable, TransactionAware {

66

// Transaction-aware stream consumption

67

DequeInputDatum poll(long timeout, TimeUnit unit) throws InterruptedException;

68

void consume(int maxEvents, StreamConsumerCallback callback) throws InterruptedException;

69

70

// Consumer positioning and state

71

void seek(StreamEventOffset offset);

72

StreamEventOffset getPosition();

73

74

// Transaction-aware state management

75

@Override

76

void startTx(Transaction tx);

77

@Override

78

Collection<byte[]> getTxChanges();

79

@Override

80

boolean commitTx() throws Exception;

81

@Override

82

void postTxCommit();

83

@Override

84

boolean rollbackTx() throws Exception;

85

}

86

87

// Stream consumer factory with transaction integration

88

public interface StreamConsumerFactory {

89

StreamConsumer create(StreamId streamId, String namespace, ConsumerConfig config);

90

StreamConsumer create(StreamId streamId, String namespace, ConsumerConfig config,

91

StreamConsumerState startState);

92

}

93

94

// Consumer state management for transaction coordination

95

public interface ConsumerState<T> {

96

T getState();

97

void setState(T state);

98

long getTimestamp();

99

}

100

101

// Consumer state store with transaction support

102

public interface ConsumerStateStore<S, T> extends TransactionAware {

103

void configureState(S state, T initialState);

104

ConsumerState<T> getState(S state);

105

void saveState(S state, T stateValue, long timestamp);

106

void removeState(S state);

107

}

108

```

109

110

### Stream Administration

111

112

Stream administration operations with transaction coordination support.

113

114

```java { .api }

115

public interface StreamAdmin {

116

// Stream lifecycle management

117

void create(StreamId streamId) throws Exception;

118

void create(StreamId streamId, Map<String, String> properties) throws Exception;

119

void drop(StreamId streamId) throws Exception;

120

void truncate(StreamId streamId) throws Exception;

121

122

// Stream configuration

123

void updateConfig(StreamId streamId, StreamProperties properties) throws Exception;

124

StreamProperties getConfig(StreamId streamId) throws Exception;

125

126

// Stream metadata and statistics

127

StreamSpecification getSpecification(StreamId streamId) throws Exception;

128

List<StreamSpecification> listStreams(NamespaceId namespaceId) throws Exception;

129

130

// Transaction coordination

131

void upgrade() throws Exception;

132

boolean exists(StreamId streamId) throws Exception;

133

}

134

```

135

136

## Usage Examples

137

138

### Basic Transaction Operations

139

140

```java

141

// Access transaction system client (typically injected)

142

TransactionSystemClient txClient = // ... obtain instance

143

144

// Start a short transaction

145

Transaction tx = txClient.startShort();

146

try {

147

// Perform transactional operations

148

performDatasetOperations(tx);

149

150

// Check if transaction can be committed

151

Collection<byte[]> changeIds = getChangeIds();

152

if (txClient.canCommit(tx, changeIds)) {

153

// Commit the transaction

154

boolean committed = txClient.commit(tx);

155

if (committed) {

156

System.out.println("Transaction committed successfully: " + tx.getTransactionId());

157

}

158

}

159

} catch (Exception e) {

160

// Abort transaction on error

161

txClient.abort(tx);

162

System.out.println("Transaction aborted: " + tx.getTransactionId() + ", error: " + e.getMessage());

163

}

164

165

// Start a long transaction with timeout

166

Transaction longTx = txClient.startLong();

167

try {

168

performLongRunningOperation(longTx);

169

txClient.commitOrThrow(longTx);

170

} catch (TransactionFailureException e) {

171

System.out.println("Long transaction failed: " + e.getMessage());

172

txClient.abort(longTx);

173

}

174

```

175

176

### Transaction Executor Usage

177

178

```java

179

// Create transaction executor with dataset instances

180

public void executeTransactionalOperation(TransactionExecutorFactory txFactory,

181

KeyValueTable dataset1,

182

KeyValueTable dataset2) {

183

184

// Create executor with transaction-aware datasets

185

TransactionExecutor executor = txFactory.createExecutor(Arrays.asList(dataset1, dataset2));

186

187

// Execute transactional operation

188

executor.execute(new TransactionExecutor.Subroutine() {

189

@Override

190

public void apply() throws Exception {

191

// All operations within this block are transactional

192

byte[] key = Bytes.toBytes("user123");

193

194

// Read from first dataset

195

byte[] userData = dataset1.read(key);

196

197

if (userData != null) {

198

// Process data and write to second dataset

199

byte[] processedData = processUserData(userData);

200

dataset2.write(key, processedData);

201

202

// Update original dataset

203

dataset1.write(key, updatedUserData(userData));

204

}

205

206

System.out.println("Transactional operation completed for key: " + Bytes.toString(key));

207

}

208

});

209

}

210

211

// Execute with custom transaction configuration

212

TransactionExecutor.Configuration config = TransactionExecutor.Configuration.builder()

213

.setTimeout(30, TimeUnit.SECONDS)

214

.setMaxRetries(3)

215

.build();

216

217

TransactionExecutor customExecutor = txFactory.createExecutor(datasets, config);

218

customExecutor.execute(() -> {

219

// Custom transaction logic with specific timeout and retry settings

220

});

221

```

222

223

### Retrying Transaction Client

224

225

```java

226

// Create retrying transaction client for robust operations

227

RetryingLongTransactionSystemClient retryingClient =

228

RetryingLongTransactionSystemClient.builder()

229

.setDelegate(originalTxClient)

230

.setMaxRetries(5)

231

.setRetryDelay(1000) // 1 second

232

.setRetryStrategy(RetryStrategy.EXPONENTIAL_BACKOFF)

233

.build();

234

235

// Use retrying client for critical operations

236

public void performCriticalTransactionalOperation() {

237

Transaction tx = retryingClient.startLong();

238

239

try {

240

// Critical data operations that must succeed

241

performCriticalDataMigration(tx);

242

performCriticalIndexUpdate(tx);

243

244

// The retrying client will automatically retry on transient failures

245

retryingClient.commitOrThrow(tx);

246

System.out.println("Critical operation completed successfully");

247

248

} catch (TransactionFailureException e) {

249

// Even with retries, the operation failed

250

System.err.println("Critical operation failed after retries: " + e.getMessage());

251

retryingClient.abort(tx);

252

throw new RuntimeException("Critical operation could not be completed", e);

253

}

254

}

255

```

256

257

### Stream Transaction Integration

258

259

```java

260

// Transaction-aware stream consumer

261

public class TransactionalStreamProcessor {

262

private final StreamConsumer streamConsumer;

263

private final TransactionExecutorFactory txFactory;

264

private final KeyValueTable outputDataset;

265

266

public TransactionalStreamProcessor(StreamConsumer consumer,

267

TransactionExecutorFactory txFactory,

268

KeyValueTable outputDataset) {

269

this.streamConsumer = consumer;

270

this.txFactory = txFactory;

271

this.outputDataset = outputDataset;

272

}

273

274

public void processStreamTransactionally() {

275

// Create executor with both stream consumer and output dataset

276

TransactionExecutor executor = txFactory.createExecutor(

277

Arrays.asList(streamConsumer, outputDataset));

278

279

executor.execute(new TransactionExecutor.Subroutine() {

280

@Override

281

public void apply() throws Exception {

282

// Consume events from stream within transaction

283

streamConsumer.consume(100, new StreamConsumerCallback() {

284

@Override

285

public void onEvent(DequeInputDatum event, DequeInputDatum eventMetadata) throws Exception {

286

// Process stream event

287

byte[] processedData = processStreamEvent(event);

288

289

// Write processed data to dataset within same transaction

290

String key = extractKey(event);

291

outputDataset.write(Bytes.toBytes(key), processedData);

292

}

293

294

@Override

295

public void onFinish() throws Exception {

296

System.out.println("Batch processing completed");

297

}

298

299

@Override

300

public void onError(Exception error) throws Exception {

301

System.err.println("Error processing stream events: " + error.getMessage());

302

throw error; // This will cause transaction rollback

303

}

304

});

305

}

306

});

307

}

308

}

309

310

// Stream consumer state management

311

public void manageStreamConsumerState(ConsumerStateStore<String, StreamEventOffset> stateStore,

312

TransactionExecutorFactory txFactory) {

313

314

String consumerId = "analytics-processor";

315

StreamEventOffset initialOffset = new StreamEventOffset(0, 0);

316

317

// Configure initial state

318

TransactionExecutor executor = txFactory.createExecutor(Arrays.asList(stateStore));

319

executor.execute(() -> {

320

stateStore.configureState(consumerId, initialOffset);

321

});

322

323

// Process with state updates

324

executor.execute(() -> {

325

ConsumerState<StreamEventOffset> currentState = stateStore.getState(consumerId);

326

StreamEventOffset currentOffset = currentState.getState();

327

328

// Process events and update state

329

StreamEventOffset newOffset = processEventsFromOffset(currentOffset);

330

stateStore.saveState(consumerId, newOffset, System.currentTimeMillis());

331

332

System.out.println("Updated consumer state from " + currentOffset + " to " + newOffset);

333

});

334

}

335

```

336

337

### Advanced Transaction Patterns

338

339

```java

340

// Nested transaction pattern for complex operations

341

public class NestedTransactionManager {

342

private final TransactionExecutorFactory txFactory;

343

344

public NestedTransactionManager(TransactionExecutorFactory txFactory) {

345

this.txFactory = txFactory;

346

}

347

348

public void performNestedTransactionalOperations(List<KeyValueTable> datasets) {

349

// Outer transaction for overall consistency

350

TransactionExecutor outerExecutor = txFactory.createExecutor(datasets);

351

352

outerExecutor.execute(() -> {

353

// Perform outer transaction operations

354

performOuterTransactionLogic(datasets);

355

356

// Inner operations that might need their own transaction semantics

357

for (KeyValueTable dataset : datasets) {

358

performDatasetSpecificOperations(dataset);

359

}

360

361

// Final consistency check

362

validateTransactionalConsistency(datasets);

363

});

364

}

365

366

private void performDatasetSpecificOperations(KeyValueTable dataset) {

367

// Dataset-specific operations within the outer transaction

368

try {

369

// Batch operations on single dataset

370

performBatchOperations(dataset);

371

} catch (Exception e) {

372

System.err.println("Dataset operation failed: " + e.getMessage());

373

throw new RuntimeException("Dataset operation failed", e);

374

}

375

}

376

}

377

378

// Transaction checkpoint pattern for long-running operations

379

public void performLongRunningTransactionalOperation(TransactionSystemClient txClient) {

380

Transaction tx = txClient.startLong();

381

382

try {

383

// Phase 1

384

performOperationPhase1(tx);

385

386

// Checkpoint the transaction

387

Transaction checkpointTx = txClient.checkpoint(tx);

388

System.out.println("Transaction checkpointed: " + checkpointTx.getTransactionId());

389

390

// Phase 2 using checkpointed transaction

391

performOperationPhase2(checkpointTx);

392

393

// Phase 3

394

performOperationPhase3(checkpointTx);

395

396

// Final commit

397

txClient.commitOrThrow(checkpointTx);

398

System.out.println("Long-running transaction completed successfully");

399

400

} catch (Exception e) {

401

txClient.abort(tx);

402

System.err.println("Long-running transaction failed: " + e.getMessage());

403

throw new RuntimeException("Long-running operation failed", e);

404

}

405

}

406

```

407

408

### Transaction Monitoring and Diagnostics

409

410

```java

411

// Transaction monitoring utility

412

public class TransactionMonitor {

413

private final TransactionSystemClient txClient;

414

415

public TransactionMonitor(TransactionSystemClient txClient) {

416

this.txClient = txClient;

417

}

418

419

public void monitorTransactionSystem() {

420

// Get transaction system status

421

String status = txClient.getStatus();

422

System.out.println("Transaction System Status: " + status);

423

424

// Monitor transaction metrics

425

printTransactionMetrics();

426

}

427

428

public void handleTransactionSystemRecovery() {

429

try {

430

// Reset transaction system state if needed

431

System.out.println("Resetting transaction system state...");

432

txClient.resetState();

433

System.out.println("Transaction system state reset completed");

434

435

} catch (Exception e) {

436

System.err.println("Failed to reset transaction system: " + e.getMessage());

437

}

438

}

439

440

public void createTransactionSnapshot() {

441

try {

442

InputStream snapshotStream = txClient.getSnapshotInputStream();

443

444

// Save snapshot for backup/recovery

445

saveSnapshotToFile(snapshotStream, "tx-snapshot-" + System.currentTimeMillis());

446

System.out.println("Transaction snapshot created successfully");

447

448

} catch (TransactionCouldNotTakeSnapshotException e) {

449

System.err.println("Could not create transaction snapshot: " + e.getMessage());

450

}

451

}

452

}

453

```

454

455

## Types

456

457

```java { .api }

458

// Core transaction types from Apache Tephra extended for CDAP

459

public interface Transaction extends org.apache.tephra.Transaction {

460

// Transaction identification and state

461

long getTransactionId();

462

long getVisibilityUpperBound();

463

Set<Long> getInvalids();

464

Set<Long> getInProgress();

465

466

// Transaction type and timeouts

467

TransactionType getType();

468

long getTimeout();

469

}

470

471

// Transaction executor configuration

472

public static class Configuration {

473

public static Builder builder();

474

475

public int getMaxRetries();

476

public long getTimeoutMillis();

477

public TimeUnit getTimeoutUnit();

478

479

public static class Builder {

480

public Builder setTimeout(long timeout, TimeUnit unit);

481

public Builder setMaxRetries(int maxRetries);

482

public Configuration build();

483

}

484

}

485

486

// Transaction-aware interface for CDAP components

487

public interface TransactionAware extends org.apache.tephra.TransactionAware {

488

@Override

489

void startTx(Transaction tx);

490

491

@Override

492

Collection<byte[]> getTxChanges();

493

494

@Override

495

boolean commitTx() throws Exception;

496

497

@Override

498

void postTxCommit();

499

500

@Override

501

boolean rollbackTx() throws Exception;

502

503

@Override

504

String getTransactionAwareName();

505

}

506

507

// Stream event and consumer types

508

public interface DequeInputDatum {

509

byte[] getData();

510

Map<String, String> getHeaders();

511

long getTimestamp();

512

}

513

514

public interface StreamConsumerCallback {

515

void onEvent(DequeInputDatum event, DequeInputDatum eventMetadata) throws Exception;

516

void onFinish() throws Exception;

517

void onError(Exception error) throws Exception;

518

}

519

520

public final class StreamEventOffset {

521

public long getGeneration();

522

public long getOffset();

523

524

public StreamEventOffset(long generation, long offset);

525

}

526

527

// Consumer configuration

528

public final class ConsumerConfig {

529

public static Builder builder();

530

531

public int getDequeueTimeout();

532

public int getMaxDequeueSize();

533

534

public static class Builder {

535

public Builder setDequeueTimeout(int timeout);

536

public Builder setMaxDequeueSize(int size);

537

public ConsumerConfig build();

538

}

539

}

540

541

// Retry strategy enumeration

542

public enum RetryStrategy {

543

FIXED_DELAY,

544

LINEAR_BACKOFF,

545

EXPONENTIAL_BACKOFF,

546

CUSTOM

547

}

548

549

// Exception types

550

public class TransactionFailureException extends Exception {

551

public TransactionFailureException(String message);

552

public TransactionFailureException(String message, Throwable cause);

553

}

554

555

public class TransactionNotInProgressException extends TransactionFailureException {

556

public TransactionNotInProgressException(String message);

557

}

558

559

public class TransactionCouldNotTakeSnapshotException extends Exception {

560

public TransactionCouldNotTakeSnapshotException(String message);

561

public TransactionCouldNotTakeSnapshotException(String message, Throwable cause);

562

}

563

```