CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-co-cask-cdap--cdap-data-fabric

Core data management capabilities for CDAP including dataset operations, metadata management, lineage tracking, audit functionality, and data registry services for Hadoop-based applications.

Pending
Overview
Eval results
Files

transaction-management.mddocs/

Transaction Management

Distributed transaction support with retry logic, consumer state management, and integration with Apache Tephra for ACID guarantees. The transaction management system provides essential capabilities for maintaining data consistency across distributed datasets and operations in the CDAP platform.

Capabilities

Core Transaction Factory

The primary interface for creating transaction executors with custom configuration and retry policies.

public interface TransactionExecutorFactory extends org.apache.tephra.TransactionExecutorFactory {
    /**
     * Creates a new transaction executor with dynamic transaction context creation.
     * This allows for use of the factory with a DynamicDatasetCache.
     * 
     * @param txContextFactory the TransactionContextFactory for creating new TransactionContext
     * @return a new instance of TransactionExecutor
     */
    TransactionExecutor createExecutor(TransactionContextFactory txContextFactory);
}

Transaction Context Factory

Factory interface for creating transaction contexts, enabling dynamic dataset cache integration with transaction support.

public interface TransactionContextFactory {
    // Creates transaction contexts for dynamic dataset operations
    TransactionContext create();
}

Apache Tephra Integration

CDAP Data Fabric integrates with Apache Tephra for distributed transaction support. The standard Tephra interfaces are used for core transaction operations:

// Standard Tephra transaction system client (from org.apache.tephra)
public interface TransactionSystemClient {
    Transaction startShort();
    Transaction startLong();
    boolean canCommit(Transaction tx, Collection<byte[]> changeIds);
    boolean commit(Transaction tx);
    void abort(Transaction tx);
    // ... other standard Tephra operations
}

// Transaction-aware interface for participating in transactions
public interface TransactionAware {
    void startTx(Transaction tx);
    Collection<byte[]> getTxChanges();
    boolean commitTx() throws Exception;
    void postTxCommit();
    boolean rollbackTx() throws Exception;
}

Stream Transaction Support

Transaction-aware stream processing components with consumer state management and coordination.

// Stream consumer with transaction support
public interface StreamConsumer extends Closeable, TransactionAware {
    // Transaction-aware stream consumption
    DequeInputDatum poll(long timeout, TimeUnit unit) throws InterruptedException;
    void consume(int maxEvents, StreamConsumerCallback callback) throws InterruptedException;
    
    // Consumer positioning and state
    void seek(StreamEventOffset offset);
    StreamEventOffset getPosition();
    
    // Transaction-aware state management
    @Override
    void startTx(Transaction tx);
    @Override
    Collection<byte[]> getTxChanges();
    @Override
    boolean commitTx() throws Exception;
    @Override
    void postTxCommit();
    @Override
    boolean rollbackTx() throws Exception;
}

// Stream consumer factory with transaction integration
public interface StreamConsumerFactory {
    StreamConsumer create(StreamId streamId, String namespace, ConsumerConfig config);
    StreamConsumer create(StreamId streamId, String namespace, ConsumerConfig config, 
                         StreamConsumerState startState);
}

// Consumer state management for transaction coordination
public interface ConsumerState<T> {
    T getState();
    void setState(T state);
    long getTimestamp();
}

// Consumer state store with transaction support
public interface ConsumerStateStore<S, T> extends TransactionAware {
    void configureState(S state, T initialState);
    ConsumerState<T> getState(S state);
    void saveState(S state, T stateValue, long timestamp);
    void removeState(S state);
}

Stream Administration

Stream administration operations with transaction coordination support.

public interface StreamAdmin {
    // Stream lifecycle management
    void create(StreamId streamId) throws Exception;
    void create(StreamId streamId, Map<String, String> properties) throws Exception;
    void drop(StreamId streamId) throws Exception;
    void truncate(StreamId streamId) throws Exception;
    
    // Stream configuration
    void updateConfig(StreamId streamId, StreamProperties properties) throws Exception;
    StreamProperties getConfig(StreamId streamId) throws Exception;
    
    // Stream metadata and statistics
    StreamSpecification getSpecification(StreamId streamId) throws Exception;
    List<StreamSpecification> listStreams(NamespaceId namespaceId) throws Exception;
    
    // Transaction coordination
    void upgrade() throws Exception;
    boolean exists(StreamId streamId) throws Exception;
}

Usage Examples

Basic Transaction Operations

// Access transaction system client (typically injected)
TransactionSystemClient txClient = // ... obtain instance

// Start a short transaction
Transaction tx = txClient.startShort();
try {
    // Perform transactional operations
    performDatasetOperations(tx);
    
    // Check if transaction can be committed
    Collection<byte[]> changeIds = getChangeIds();
    if (txClient.canCommit(tx, changeIds)) {
        // Commit the transaction
        boolean committed = txClient.commit(tx);
        if (committed) {
            System.out.println("Transaction committed successfully: " + tx.getTransactionId());
        }
    }
} catch (Exception e) {
    // Abort transaction on error
    txClient.abort(tx);
    System.out.println("Transaction aborted: " + tx.getTransactionId() + ", error: " + e.getMessage());
}

// Start a long transaction with timeout
Transaction longTx = txClient.startLong();
try {
    performLongRunningOperation(longTx);
    txClient.commitOrThrow(longTx);
} catch (TransactionFailureException e) {
    System.out.println("Long transaction failed: " + e.getMessage());
    txClient.abort(longTx);
}

Transaction Executor Usage

// Create transaction executor with dataset instances
public void executeTransactionalOperation(TransactionExecutorFactory txFactory, 
                                        KeyValueTable dataset1, 
                                        KeyValueTable dataset2) {
    
    // Create executor with transaction-aware datasets
    TransactionExecutor executor = txFactory.createExecutor(Arrays.asList(dataset1, dataset2));
    
    // Execute transactional operation
    executor.execute(new TransactionExecutor.Subroutine() {
        @Override
        public void apply() throws Exception {
            // All operations within this block are transactional
            byte[] key = Bytes.toBytes("user123");
            
            // Read from first dataset
            byte[] userData = dataset1.read(key);
            
            if (userData != null) {
                // Process data and write to second dataset
                byte[] processedData = processUserData(userData);
                dataset2.write(key, processedData);
                
                // Update original dataset
                dataset1.write(key, updatedUserData(userData));
            }
            
            System.out.println("Transactional operation completed for key: " + Bytes.toString(key));
        }
    });
}

// Execute with custom transaction configuration
TransactionExecutor.Configuration config = TransactionExecutor.Configuration.builder()
    .setTimeout(30, TimeUnit.SECONDS)
    .setMaxRetries(3)
    .build();

TransactionExecutor customExecutor = txFactory.createExecutor(datasets, config);
customExecutor.execute(() -> {
    // Custom transaction logic with specific timeout and retry settings
});

Retrying Transaction Client

// Create retrying transaction client for robust operations
RetryingLongTransactionSystemClient retryingClient = 
    RetryingLongTransactionSystemClient.builder()
        .setDelegate(originalTxClient)
        .setMaxRetries(5)
        .setRetryDelay(1000) // 1 second
        .setRetryStrategy(RetryStrategy.EXPONENTIAL_BACKOFF)
        .build();

// Use retrying client for critical operations
public void performCriticalTransactionalOperation() {
    Transaction tx = retryingClient.startLong();
    
    try {
        // Critical data operations that must succeed
        performCriticalDataMigration(tx);
        performCriticalIndexUpdate(tx);
        
        // The retrying client will automatically retry on transient failures
        retryingClient.commitOrThrow(tx);
        System.out.println("Critical operation completed successfully");
        
    } catch (TransactionFailureException e) {
        // Even with retries, the operation failed
        System.err.println("Critical operation failed after retries: " + e.getMessage());
        retryingClient.abort(tx);
        throw new RuntimeException("Critical operation could not be completed", e);
    }
}

Stream Transaction Integration

// Transaction-aware stream consumer
public class TransactionalStreamProcessor {
    private final StreamConsumer streamConsumer;
    private final TransactionExecutorFactory txFactory;
    private final KeyValueTable outputDataset;
    
    public TransactionalStreamProcessor(StreamConsumer consumer, 
                                      TransactionExecutorFactory txFactory,
                                      KeyValueTable outputDataset) {
        this.streamConsumer = consumer;
        this.txFactory = txFactory;
        this.outputDataset = outputDataset;
    }
    
    public void processStreamTransactionally() {
        // Create executor with both stream consumer and output dataset
        TransactionExecutor executor = txFactory.createExecutor(
            Arrays.asList(streamConsumer, outputDataset));
        
        executor.execute(new TransactionExecutor.Subroutine() {
            @Override
            public void apply() throws Exception {
                // Consume events from stream within transaction
                streamConsumer.consume(100, new StreamConsumerCallback() {
                    @Override
                    public void onEvent(DequeInputDatum event, DequeInputDatum eventMetadata) throws Exception {
                        // Process stream event
                        byte[] processedData = processStreamEvent(event);
                        
                        // Write processed data to dataset within same transaction
                        String key = extractKey(event);
                        outputDataset.write(Bytes.toBytes(key), processedData);
                    }
                    
                    @Override
                    public void onFinish() throws Exception {
                        System.out.println("Batch processing completed");
                    }
                    
                    @Override
                    public void onError(Exception error) throws Exception {
                        System.err.println("Error processing stream events: " + error.getMessage());
                        throw error; // This will cause transaction rollback
                    }
                });
            }
        });
    }
}

// Stream consumer state management
public void manageStreamConsumerState(ConsumerStateStore<String, StreamEventOffset> stateStore,
                                     TransactionExecutorFactory txFactory) {
    
    String consumerId = "analytics-processor";
    StreamEventOffset initialOffset = new StreamEventOffset(0, 0);
    
    // Configure initial state
    TransactionExecutor executor = txFactory.createExecutor(Arrays.asList(stateStore));
    executor.execute(() -> {
        stateStore.configureState(consumerId, initialOffset);
    });
    
    // Process with state updates
    executor.execute(() -> {
        ConsumerState<StreamEventOffset> currentState = stateStore.getState(consumerId);
        StreamEventOffset currentOffset = currentState.getState();
        
        // Process events and update state
        StreamEventOffset newOffset = processEventsFromOffset(currentOffset);
        stateStore.saveState(consumerId, newOffset, System.currentTimeMillis());
        
        System.out.println("Updated consumer state from " + currentOffset + " to " + newOffset);
    });
}

Advanced Transaction Patterns

// Nested transaction pattern for complex operations
public class NestedTransactionManager {
    private final TransactionExecutorFactory txFactory;
    
    public NestedTransactionManager(TransactionExecutorFactory txFactory) {
        this.txFactory = txFactory;
    }
    
    public void performNestedTransactionalOperations(List<KeyValueTable> datasets) {
        // Outer transaction for overall consistency
        TransactionExecutor outerExecutor = txFactory.createExecutor(datasets);
        
        outerExecutor.execute(() -> {
            // Perform outer transaction operations
            performOuterTransactionLogic(datasets);
            
            // Inner operations that might need their own transaction semantics
            for (KeyValueTable dataset : datasets) {
                performDatasetSpecificOperations(dataset);
            }
            
            // Final consistency check
            validateTransactionalConsistency(datasets);
        });
    }
    
    private void performDatasetSpecificOperations(KeyValueTable dataset) {
        // Dataset-specific operations within the outer transaction
        try {
            // Batch operations on single dataset
            performBatchOperations(dataset);
        } catch (Exception e) {
            System.err.println("Dataset operation failed: " + e.getMessage());
            throw new RuntimeException("Dataset operation failed", e);
        }
    }
}

// Transaction checkpoint pattern for long-running operations
public void performLongRunningTransactionalOperation(TransactionSystemClient txClient) {
    Transaction tx = txClient.startLong();
    
    try {
        // Phase 1
        performOperationPhase1(tx);
        
        // Checkpoint the transaction
        Transaction checkpointTx = txClient.checkpoint(tx);
        System.out.println("Transaction checkpointed: " + checkpointTx.getTransactionId());
        
        // Phase 2 using checkpointed transaction
        performOperationPhase2(checkpointTx);
        
        // Phase 3
        performOperationPhase3(checkpointTx);
        
        // Final commit
        txClient.commitOrThrow(checkpointTx);
        System.out.println("Long-running transaction completed successfully");
        
    } catch (Exception e) {
        txClient.abort(tx);
        System.err.println("Long-running transaction failed: " + e.getMessage());
        throw new RuntimeException("Long-running operation failed", e);
    }
}

Transaction Monitoring and Diagnostics

// Transaction monitoring utility
public class TransactionMonitor {
    private final TransactionSystemClient txClient;
    
    public TransactionMonitor(TransactionSystemClient txClient) {
        this.txClient = txClient;
    }
    
    public void monitorTransactionSystem() {
        // Get transaction system status
        String status = txClient.getStatus();
        System.out.println("Transaction System Status: " + status);
        
        // Monitor transaction metrics
        printTransactionMetrics();
    }
    
    public void handleTransactionSystemRecovery() {
        try {
            // Reset transaction system state if needed
            System.out.println("Resetting transaction system state...");
            txClient.resetState();
            System.out.println("Transaction system state reset completed");
            
        } catch (Exception e) {
            System.err.println("Failed to reset transaction system: " + e.getMessage());
        }
    }
    
    public void createTransactionSnapshot() {
        try {
            InputStream snapshotStream = txClient.getSnapshotInputStream();
            
            // Save snapshot for backup/recovery
            saveSnapshotToFile(snapshotStream, "tx-snapshot-" + System.currentTimeMillis());
            System.out.println("Transaction snapshot created successfully");
            
        } catch (TransactionCouldNotTakeSnapshotException e) {
            System.err.println("Could not create transaction snapshot: " + e.getMessage());
        }
    }
}

Types

// Core transaction types from Apache Tephra extended for CDAP
public interface Transaction extends org.apache.tephra.Transaction {
    // Transaction identification and state
    long getTransactionId();
    long getVisibilityUpperBound();
    Set<Long> getInvalids();
    Set<Long> getInProgress();
    
    // Transaction type and timeouts
    TransactionType getType();
    long getTimeout();
}

// Transaction executor configuration
public static class Configuration {
    public static Builder builder();
    
    public int getMaxRetries();
    public long getTimeoutMillis();
    public TimeUnit getTimeoutUnit();
    
    public static class Builder {
        public Builder setTimeout(long timeout, TimeUnit unit);
        public Builder setMaxRetries(int maxRetries);
        public Configuration build();
    }
}

// Transaction-aware interface for CDAP components
public interface TransactionAware extends org.apache.tephra.TransactionAware {
    @Override
    void startTx(Transaction tx);
    
    @Override
    Collection<byte[]> getTxChanges();
    
    @Override
    boolean commitTx() throws Exception;
    
    @Override
    void postTxCommit();
    
    @Override
    boolean rollbackTx() throws Exception;
    
    @Override
    String getTransactionAwareName();
}

// Stream event and consumer types
public interface DequeInputDatum {
    byte[] getData();
    Map<String, String> getHeaders();
    long getTimestamp();
}

public interface StreamConsumerCallback {
    void onEvent(DequeInputDatum event, DequeInputDatum eventMetadata) throws Exception;
    void onFinish() throws Exception;
    void onError(Exception error) throws Exception;
}

public final class StreamEventOffset {
    public long getGeneration();
    public long getOffset();
    
    public StreamEventOffset(long generation, long offset);
}

// Consumer configuration
public final class ConsumerConfig {
    public static Builder builder();
    
    public int getDequeueTimeout();
    public int getMaxDequeueSize();
    
    public static class Builder {
        public Builder setDequeueTimeout(int timeout);
        public Builder setMaxDequeueSize(int size);
        public ConsumerConfig build();
    }
}

// Retry strategy enumeration
public enum RetryStrategy {
    FIXED_DELAY,
    LINEAR_BACKOFF,
    EXPONENTIAL_BACKOFF,
    CUSTOM
}

// Exception types
public class TransactionFailureException extends Exception {
    public TransactionFailureException(String message);
    public TransactionFailureException(String message, Throwable cause);
}

public class TransactionNotInProgressException extends TransactionFailureException {
    public TransactionNotInProgressException(String message);
}

public class TransactionCouldNotTakeSnapshotException extends Exception {
    public TransactionCouldNotTakeSnapshotException(String message);
    public TransactionCouldNotTakeSnapshotException(String message, Throwable cause);
}

Install with Tessl CLI

npx tessl i tessl/maven-co-cask-cdap--cdap-data-fabric

docs

audit-compliance.md

dataset-management.md

index.md

metadata-management.md

namespace-management.md

stream-processing.md

transaction-management.md

usage-registry.md

tile.json