Core data management capabilities for CDAP including dataset operations, metadata management, lineage tracking, audit functionality, and data registry services for Hadoop-based applications.
—
Distributed transaction support with retry logic, consumer state management, and integration with Apache Tephra for ACID guarantees. The transaction management system provides essential capabilities for maintaining data consistency across distributed datasets and operations in the CDAP platform.
The primary interface for creating transaction executors with custom configuration and retry policies.
public interface TransactionExecutorFactory extends org.apache.tephra.TransactionExecutorFactory {
/**
* Creates a new transaction executor with dynamic transaction context creation.
* This allows for use of the factory with a DynamicDatasetCache.
*
* @param txContextFactory the TransactionContextFactory for creating new TransactionContext
* @return a new instance of TransactionExecutor
*/
TransactionExecutor createExecutor(TransactionContextFactory txContextFactory);
}Factory interface for creating transaction contexts, enabling dynamic dataset cache integration with transaction support.
public interface TransactionContextFactory {
// Creates transaction contexts for dynamic dataset operations
TransactionContext create();
}CDAP Data Fabric integrates with Apache Tephra for distributed transaction support. The standard Tephra interfaces are used for core transaction operations:
// Standard Tephra transaction system client (from org.apache.tephra)
public interface TransactionSystemClient {
Transaction startShort();
Transaction startLong();
boolean canCommit(Transaction tx, Collection<byte[]> changeIds);
boolean commit(Transaction tx);
void abort(Transaction tx);
// ... other standard Tephra operations
}
// Transaction-aware interface for participating in transactions
public interface TransactionAware {
void startTx(Transaction tx);
Collection<byte[]> getTxChanges();
boolean commitTx() throws Exception;
void postTxCommit();
boolean rollbackTx() throws Exception;
}Transaction-aware stream processing components with consumer state management and coordination.
// Stream consumer with transaction support
public interface StreamConsumer extends Closeable, TransactionAware {
// Transaction-aware stream consumption
DequeInputDatum poll(long timeout, TimeUnit unit) throws InterruptedException;
void consume(int maxEvents, StreamConsumerCallback callback) throws InterruptedException;
// Consumer positioning and state
void seek(StreamEventOffset offset);
StreamEventOffset getPosition();
// Transaction-aware state management
@Override
void startTx(Transaction tx);
@Override
Collection<byte[]> getTxChanges();
@Override
boolean commitTx() throws Exception;
@Override
void postTxCommit();
@Override
boolean rollbackTx() throws Exception;
}
// Stream consumer factory with transaction integration
public interface StreamConsumerFactory {
StreamConsumer create(StreamId streamId, String namespace, ConsumerConfig config);
StreamConsumer create(StreamId streamId, String namespace, ConsumerConfig config,
StreamConsumerState startState);
}
// Consumer state management for transaction coordination
public interface ConsumerState<T> {
T getState();
void setState(T state);
long getTimestamp();
}
// Consumer state store with transaction support
public interface ConsumerStateStore<S, T> extends TransactionAware {
void configureState(S state, T initialState);
ConsumerState<T> getState(S state);
void saveState(S state, T stateValue, long timestamp);
void removeState(S state);
}Stream administration operations with transaction coordination support.
public interface StreamAdmin {
// Stream lifecycle management
void create(StreamId streamId) throws Exception;
void create(StreamId streamId, Map<String, String> properties) throws Exception;
void drop(StreamId streamId) throws Exception;
void truncate(StreamId streamId) throws Exception;
// Stream configuration
void updateConfig(StreamId streamId, StreamProperties properties) throws Exception;
StreamProperties getConfig(StreamId streamId) throws Exception;
// Stream metadata and statistics
StreamSpecification getSpecification(StreamId streamId) throws Exception;
List<StreamSpecification> listStreams(NamespaceId namespaceId) throws Exception;
// Transaction coordination
void upgrade() throws Exception;
boolean exists(StreamId streamId) throws Exception;
}// Access transaction system client (typically injected)
TransactionSystemClient txClient = // ... obtain instance
// Start a short transaction
Transaction tx = txClient.startShort();
try {
// Perform transactional operations
performDatasetOperations(tx);
// Check if transaction can be committed
Collection<byte[]> changeIds = getChangeIds();
if (txClient.canCommit(tx, changeIds)) {
// Commit the transaction
boolean committed = txClient.commit(tx);
if (committed) {
System.out.println("Transaction committed successfully: " + tx.getTransactionId());
}
}
} catch (Exception e) {
// Abort transaction on error
txClient.abort(tx);
System.out.println("Transaction aborted: " + tx.getTransactionId() + ", error: " + e.getMessage());
}
// Start a long transaction with timeout
Transaction longTx = txClient.startLong();
try {
performLongRunningOperation(longTx);
txClient.commitOrThrow(longTx);
} catch (TransactionFailureException e) {
System.out.println("Long transaction failed: " + e.getMessage());
txClient.abort(longTx);
}// Create transaction executor with dataset instances
public void executeTransactionalOperation(TransactionExecutorFactory txFactory,
KeyValueTable dataset1,
KeyValueTable dataset2) {
// Create executor with transaction-aware datasets
TransactionExecutor executor = txFactory.createExecutor(Arrays.asList(dataset1, dataset2));
// Execute transactional operation
executor.execute(new TransactionExecutor.Subroutine() {
@Override
public void apply() throws Exception {
// All operations within this block are transactional
byte[] key = Bytes.toBytes("user123");
// Read from first dataset
byte[] userData = dataset1.read(key);
if (userData != null) {
// Process data and write to second dataset
byte[] processedData = processUserData(userData);
dataset2.write(key, processedData);
// Update original dataset
dataset1.write(key, updatedUserData(userData));
}
System.out.println("Transactional operation completed for key: " + Bytes.toString(key));
}
});
}
// Execute with custom transaction configuration
TransactionExecutor.Configuration config = TransactionExecutor.Configuration.builder()
.setTimeout(30, TimeUnit.SECONDS)
.setMaxRetries(3)
.build();
TransactionExecutor customExecutor = txFactory.createExecutor(datasets, config);
customExecutor.execute(() -> {
// Custom transaction logic with specific timeout and retry settings
});// Create retrying transaction client for robust operations
RetryingLongTransactionSystemClient retryingClient =
RetryingLongTransactionSystemClient.builder()
.setDelegate(originalTxClient)
.setMaxRetries(5)
.setRetryDelay(1000) // 1 second
.setRetryStrategy(RetryStrategy.EXPONENTIAL_BACKOFF)
.build();
// Use retrying client for critical operations
public void performCriticalTransactionalOperation() {
Transaction tx = retryingClient.startLong();
try {
// Critical data operations that must succeed
performCriticalDataMigration(tx);
performCriticalIndexUpdate(tx);
// The retrying client will automatically retry on transient failures
retryingClient.commitOrThrow(tx);
System.out.println("Critical operation completed successfully");
} catch (TransactionFailureException e) {
// Even with retries, the operation failed
System.err.println("Critical operation failed after retries: " + e.getMessage());
retryingClient.abort(tx);
throw new RuntimeException("Critical operation could not be completed", e);
}
}// Transaction-aware stream consumer
public class TransactionalStreamProcessor {
private final StreamConsumer streamConsumer;
private final TransactionExecutorFactory txFactory;
private final KeyValueTable outputDataset;
public TransactionalStreamProcessor(StreamConsumer consumer,
TransactionExecutorFactory txFactory,
KeyValueTable outputDataset) {
this.streamConsumer = consumer;
this.txFactory = txFactory;
this.outputDataset = outputDataset;
}
public void processStreamTransactionally() {
// Create executor with both stream consumer and output dataset
TransactionExecutor executor = txFactory.createExecutor(
Arrays.asList(streamConsumer, outputDataset));
executor.execute(new TransactionExecutor.Subroutine() {
@Override
public void apply() throws Exception {
// Consume events from stream within transaction
streamConsumer.consume(100, new StreamConsumerCallback() {
@Override
public void onEvent(DequeInputDatum event, DequeInputDatum eventMetadata) throws Exception {
// Process stream event
byte[] processedData = processStreamEvent(event);
// Write processed data to dataset within same transaction
String key = extractKey(event);
outputDataset.write(Bytes.toBytes(key), processedData);
}
@Override
public void onFinish() throws Exception {
System.out.println("Batch processing completed");
}
@Override
public void onError(Exception error) throws Exception {
System.err.println("Error processing stream events: " + error.getMessage());
throw error; // This will cause transaction rollback
}
});
}
});
}
}
// Stream consumer state management
public void manageStreamConsumerState(ConsumerStateStore<String, StreamEventOffset> stateStore,
TransactionExecutorFactory txFactory) {
String consumerId = "analytics-processor";
StreamEventOffset initialOffset = new StreamEventOffset(0, 0);
// Configure initial state
TransactionExecutor executor = txFactory.createExecutor(Arrays.asList(stateStore));
executor.execute(() -> {
stateStore.configureState(consumerId, initialOffset);
});
// Process with state updates
executor.execute(() -> {
ConsumerState<StreamEventOffset> currentState = stateStore.getState(consumerId);
StreamEventOffset currentOffset = currentState.getState();
// Process events and update state
StreamEventOffset newOffset = processEventsFromOffset(currentOffset);
stateStore.saveState(consumerId, newOffset, System.currentTimeMillis());
System.out.println("Updated consumer state from " + currentOffset + " to " + newOffset);
});
}// Nested transaction pattern for complex operations
public class NestedTransactionManager {
private final TransactionExecutorFactory txFactory;
public NestedTransactionManager(TransactionExecutorFactory txFactory) {
this.txFactory = txFactory;
}
public void performNestedTransactionalOperations(List<KeyValueTable> datasets) {
// Outer transaction for overall consistency
TransactionExecutor outerExecutor = txFactory.createExecutor(datasets);
outerExecutor.execute(() -> {
// Perform outer transaction operations
performOuterTransactionLogic(datasets);
// Inner operations that might need their own transaction semantics
for (KeyValueTable dataset : datasets) {
performDatasetSpecificOperations(dataset);
}
// Final consistency check
validateTransactionalConsistency(datasets);
});
}
private void performDatasetSpecificOperations(KeyValueTable dataset) {
// Dataset-specific operations within the outer transaction
try {
// Batch operations on single dataset
performBatchOperations(dataset);
} catch (Exception e) {
System.err.println("Dataset operation failed: " + e.getMessage());
throw new RuntimeException("Dataset operation failed", e);
}
}
}
// Transaction checkpoint pattern for long-running operations
public void performLongRunningTransactionalOperation(TransactionSystemClient txClient) {
Transaction tx = txClient.startLong();
try {
// Phase 1
performOperationPhase1(tx);
// Checkpoint the transaction
Transaction checkpointTx = txClient.checkpoint(tx);
System.out.println("Transaction checkpointed: " + checkpointTx.getTransactionId());
// Phase 2 using checkpointed transaction
performOperationPhase2(checkpointTx);
// Phase 3
performOperationPhase3(checkpointTx);
// Final commit
txClient.commitOrThrow(checkpointTx);
System.out.println("Long-running transaction completed successfully");
} catch (Exception e) {
txClient.abort(tx);
System.err.println("Long-running transaction failed: " + e.getMessage());
throw new RuntimeException("Long-running operation failed", e);
}
}// Transaction monitoring utility
public class TransactionMonitor {
private final TransactionSystemClient txClient;
public TransactionMonitor(TransactionSystemClient txClient) {
this.txClient = txClient;
}
public void monitorTransactionSystem() {
// Get transaction system status
String status = txClient.getStatus();
System.out.println("Transaction System Status: " + status);
// Monitor transaction metrics
printTransactionMetrics();
}
public void handleTransactionSystemRecovery() {
try {
// Reset transaction system state if needed
System.out.println("Resetting transaction system state...");
txClient.resetState();
System.out.println("Transaction system state reset completed");
} catch (Exception e) {
System.err.println("Failed to reset transaction system: " + e.getMessage());
}
}
public void createTransactionSnapshot() {
try {
InputStream snapshotStream = txClient.getSnapshotInputStream();
// Save snapshot for backup/recovery
saveSnapshotToFile(snapshotStream, "tx-snapshot-" + System.currentTimeMillis());
System.out.println("Transaction snapshot created successfully");
} catch (TransactionCouldNotTakeSnapshotException e) {
System.err.println("Could not create transaction snapshot: " + e.getMessage());
}
}
}// Core transaction types from Apache Tephra extended for CDAP
public interface Transaction extends org.apache.tephra.Transaction {
// Transaction identification and state
long getTransactionId();
long getVisibilityUpperBound();
Set<Long> getInvalids();
Set<Long> getInProgress();
// Transaction type and timeouts
TransactionType getType();
long getTimeout();
}
// Transaction executor configuration
public static class Configuration {
public static Builder builder();
public int getMaxRetries();
public long getTimeoutMillis();
public TimeUnit getTimeoutUnit();
public static class Builder {
public Builder setTimeout(long timeout, TimeUnit unit);
public Builder setMaxRetries(int maxRetries);
public Configuration build();
}
}
// Transaction-aware interface for CDAP components
public interface TransactionAware extends org.apache.tephra.TransactionAware {
@Override
void startTx(Transaction tx);
@Override
Collection<byte[]> getTxChanges();
@Override
boolean commitTx() throws Exception;
@Override
void postTxCommit();
@Override
boolean rollbackTx() throws Exception;
@Override
String getTransactionAwareName();
}
// Stream event and consumer types
public interface DequeInputDatum {
byte[] getData();
Map<String, String> getHeaders();
long getTimestamp();
}
public interface StreamConsumerCallback {
void onEvent(DequeInputDatum event, DequeInputDatum eventMetadata) throws Exception;
void onFinish() throws Exception;
void onError(Exception error) throws Exception;
}
public final class StreamEventOffset {
public long getGeneration();
public long getOffset();
public StreamEventOffset(long generation, long offset);
}
// Consumer configuration
public final class ConsumerConfig {
public static Builder builder();
public int getDequeueTimeout();
public int getMaxDequeueSize();
public static class Builder {
public Builder setDequeueTimeout(int timeout);
public Builder setMaxDequeueSize(int size);
public ConsumerConfig build();
}
}
// Retry strategy enumeration
public enum RetryStrategy {
FIXED_DELAY,
LINEAR_BACKOFF,
EXPONENTIAL_BACKOFF,
CUSTOM
}
// Exception types
public class TransactionFailureException extends Exception {
public TransactionFailureException(String message);
public TransactionFailureException(String message, Throwable cause);
}
public class TransactionNotInProgressException extends TransactionFailureException {
public TransactionNotInProgressException(String message);
}
public class TransactionCouldNotTakeSnapshotException extends Exception {
public TransactionCouldNotTakeSnapshotException(String message);
public TransactionCouldNotTakeSnapshotException(String message, Throwable cause);
}Install with Tessl CLI
npx tessl i tessl/maven-co-cask-cdap--cdap-data-fabric