CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-co-cask-cdap--cdap-spark-core

Core Spark 1.x integration component for the Cask Data Application Platform providing runtime services and execution context for CDAP applications

Pending
Overview
Eval results
Files

transaction-management.mddocs/

Transaction Management

Transaction handling capabilities that provide ACID properties for Spark operations within the CDAP platform, enabling consistent data access across distributed Spark executors and proper integration with CDAP's transactional data systems.

Capabilities

Spark Transaction Handler

Manages transaction lifecycle for Spark jobs and stages, coordinating with CDAP's transaction system to ensure data consistency during distributed Spark execution.

/**
 * Handles transaction lifecycle for Spark jobs within CDAP
 * Coordinates with TransactionSystemClient to manage ACID properties
 */
public class SparkTransactionHandler {
    /**
     * Marks the start of a Spark job without transaction context
     * @param jobId Spark job identifier
     * @param stageIds Set of stage IDs associated with this job
     */
    public void jobStarted(Integer jobId, Set<Integer> stageIds);
    
    /**
     * Marks the start of a Spark job with transaction context
     * @param jobId Spark job identifier  
     * @param stageIds Set of stage IDs associated with this job
     * @param txInfo Transaction information for this job
     */
    public void jobStarted(Integer jobId, Set<Integer> stageIds, SparkTransactional.TransactionInfo txInfo);
    
    /**
     * Marks the completion of a Spark job
     * @param jobId Spark job identifier
     * @param succeeded Whether the job completed successfully
     */
    public void jobEnded(Integer jobId, boolean succeeded);
    
    /**
     * Marks the submission of a Spark stage
     * @param stageId Spark stage identifier
     */
    public void stageSubmitted(Integer stageId);
    
    /**
     * Marks the completion of a Spark stage
     * @param stageId Spark stage identifier
     */
    public void stageCompleted(Integer stageId);
    
    /**
     * Gets the current transaction for a job if available
     * @param jobId Spark job identifier
     * @return Optional transaction info, empty if no transaction active
     */
    public Optional<SparkTransactional.TransactionInfo> getJobTransaction(Integer jobId);
}

Spark Transactional

Provides transactional execution context for Spark operations, enabling dataset operations within transaction boundaries and proper commit/rollback behavior.

/**
 * Transactional execution context for Spark operations
 * Enables ACID properties for dataset access within Spark applications
 */
public class SparkTransactional {
    /**
     * Retrieves transaction information by key
     * @param key Transaction key identifier
     * @return TransactionInfo containing transaction details, or null if not found
     */
    public TransactionInfo getTransactionInfo(String key);
    
    /**
     * Executes a runnable within a transaction context
     * @param type Transaction type (SHORT or LONG)
     * @param runnable Code to execute within transaction
     * @throws TransactionFailureException if transaction fails
     */
    public void execute(TransactionType type, TxRunnable runnable) throws TransactionFailureException;
    
    /**
     * Executes a callable within a transaction context and returns result
     * @param type Transaction type (SHORT or LONG)
     * @param callable Code to execute within transaction
     * @return Result of the callable execution
     * @throws TransactionFailureException if transaction fails
     */
    public <T> T execute(TransactionType type, TxCallable<T> callable) throws TransactionFailureException;
    
    /**
     * Gets the current transaction context if available
     * @return Current TransactionContext, or null if no transaction active
     */
    public TransactionContext getCurrentTransactionContext();
}

Transaction Information and Types

Transaction metadata and execution context classes that provide access to transaction state and configuration.

/**
 * Container for transaction execution information
 */
public static class TransactionInfo {
    /**
     * Gets the underlying transaction object
     * @return Transaction instance
     */
    public Transaction getTransaction();
    
    /**
     * Indicates whether transaction should commit when job ends
     * @return true if auto-commit is enabled
     */
    public boolean commitOnJobEnded();
    
    /**
     * Gets the transaction timeout in seconds
     * @return Timeout value in seconds
     */
    public int getTimeoutInSeconds();
}

/**
 * Enumeration of transaction types supported by CDAP
 */
public enum TransactionType {
    /** Short-lived transactions for quick operations */
    SHORT,
    /** Long-running transactions for complex operations */
    LONG
}

/**
 * Functional interface for transactional operations without return value
 */
@FunctionalInterface
public interface TxRunnable {
    void run(TransactionContext context) throws Exception;
}

/**
 * Functional interface for transactional operations with return value
 */
@FunctionalInterface  
public interface TxCallable<T> {
    T call(TransactionContext context) throws Exception;
}

Usage Examples

Basic Transaction Management:

import co.cask.cdap.app.runtime.spark.SparkTransactionHandler;
import co.cask.cdap.app.runtime.spark.SparkTransactional;
import co.cask.cdap.app.runtime.spark.SparkTransactional.TransactionType;

// Transaction handler setup (typically done by CDAP framework)
SparkTransactionHandler txHandler = new SparkTransactionHandler(txSystemClient);

// Job starts with transaction
Set<Integer> stageIds = Sets.newHashSet(1, 2, 3);
TransactionInfo txInfo = sparkTransactional.getTransactionInfo("tx-key-123");
txHandler.jobStarted(jobId, stageIds, txInfo);

// Execute operations within transaction
sparkTransactional.execute(TransactionType.SHORT, (context) -> {
    // Dataset operations within transaction
    Table table = context.getDataset("myTable");
    table.write("key", "value");
    // Transaction automatically committed if no exceptions
});

// Job completion
txHandler.jobEnded(jobId, true);

Dataset Access with Transactions:

import co.cask.cdap.app.runtime.spark.SparkTransactional
import co.cask.cdap.api.dataset.table.Table

// Transactional dataset access in Spark
val result = sparkTransactional.execute(TransactionType.LONG, (context) => {
    val inputTable = context.getDataset[Table]("input")
    val outputTable = context.getDataset[Table]("output")
    
    // Read from input table
    val data = inputTable.read("sourceKey")
    
    // Process and write to output table
    val processed = processData(data)
    outputTable.write("targetKey", processed)
    
    processed
})

Transaction Context in Spark Jobs:

// Check for active transaction in Spark job
SparkListenerJobStart jobStart = ...;
String txKey = jobStart.getProperties().getProperty(SparkTransactional.ACTIVE_TRANSACTION_KEY);

if (txKey != null && !txKey.isEmpty()) {
    TransactionInfo txInfo = sparkTransactional.getTransactionInfo(txKey);
    if (txInfo != null) {
        LOG.info("Job {} running with transaction: {}, auto-commit: {}", 
                 jobStart.jobId(), txInfo.getTransaction(), txInfo.commitOnJobEnded());
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-co-cask-cdap--cdap-spark-core

docs

data-processing.md

distributed-execution.md

dynamic-compilation.md

execution-contexts.md

http-services.md

index.md

runtime-providers.md

transaction-management.md

tile.json