Core application programming interface for the Cask Data Application Platform enabling development of scalable data processing applications on Hadoop ecosystems.
—
CDAP provides built-in support for ACID transactions across datasets, offering both declarative transaction control through annotations and programmatic transaction management through transactional interfaces.
Executes operations within transactions, ensuring ACID properties across dataset operations.
/**
* Interface for executing operations within transactions
*/
public interface Transactional {
/**
* Executes a TxRunnable within a transaction
* @param runnable the operations to execute in the transaction
* @throws TransactionFailureException if transaction fails
*/
void execute(TxRunnable runnable) throws TransactionFailureException;
/**
* Executes a TxRunnable within a transaction with timeout
* @param timeoutInSeconds transaction timeout in seconds
* @param runnable the operations to execute in the transaction
* @throws TransactionFailureException if transaction fails
*/
void execute(int timeoutInSeconds, TxRunnable runnable) throws TransactionFailureException;
}Functional interface for defining transactional operations with dataset access.
/**
* Runnable that provides DatasetContext for transactional dataset operations
*/
@FunctionalInterface
public interface TxRunnable {
/**
* Execute operations with access to datasets within a transaction
* @param context provides access to datasets
* @throws Exception if operation fails
*/
void run(DatasetContext context) throws Exception;
}Declarative transaction control using annotations for method-level transaction management.
/**
* Annotation to control transaction behavior for program lifecycle methods
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface TransactionPolicy {
TransactionControl value();
}
/**
* Enum defining transaction control modes
*/
public enum TransactionControl {
EXPLICIT, // Method controls its own transactions
IMPLICIT // Platform manages transactions automatically
}Usage Examples:
import co.cask.cdap.api.Transactional;
import co.cask.cdap.api.TxRunnable;
import co.cask.cdap.api.annotation.TransactionPolicy;
import co.cask.cdap.api.annotation.TransactionControl;
import co.cask.cdap.api.dataset.lib.KeyValueTable;
// Programmatic transaction management
public class TransactionalService {
public void processData(Transactional transactional, List<DataRecord> records) {
transactional.execute(context -> {
KeyValueTable table = context.getDataset("processed-data");
for (DataRecord record : records) {
// All operations within this block are transactional
String key = record.getId();
String value = processRecord(record);
table.write(key, value);
}
// Transaction commits automatically on successful completion
});
}
// Transaction with timeout
public void processLargeDataset(Transactional transactional, List<DataRecord> largeDataset) {
transactional.execute(300, context -> { // 5-minute timeout
KeyValueTable table = context.getDataset("large-processed-data");
// Long-running operation with extended timeout
for (DataRecord record : largeDataset) {
performComplexProcessing(record, table);
}
});
}
}
// Declarative transaction control with annotations
public class MyWorker extends AbstractWorker {
@Override
@TransactionPolicy(TransactionControl.EXPLICIT)
public void initialize(WorkerContext context) {
// This method runs without automatic transaction management
// Useful for initialization that may exceed transaction timeouts
setupExternalConnections();
// Can still use programmatic transactions when needed
context.execute(datasetContext -> {
// This block runs in a transaction
KeyValueTable config = datasetContext.getDataset("worker-config");
loadConfiguration(config);
});
}
@Override
// Default behavior - runs in implicit transaction
public void run() {
// This method automatically runs within a transaction
// All dataset operations are automatically transactional
KeyValueTable workTable = getContext().getDataset("work-items");
String workItem = workTable.read("next-item");
processWorkItem(workItem);
workTable.write("processed-item", workItem);
// Transaction commits on successful method completion
}
@Override
@TransactionPolicy(TransactionControl.EXPLICIT)
public void destroy() {
// Cleanup method without transaction overhead
closeExternalConnections();
}
}EXPLICIT manage their own transactionsTransactional.execute() for controlled transactionsWhen to use EXPLICIT control:
When to use IMPLICIT control (default):
Install with Tessl CLI
npx tessl i tessl/maven-co-cask-cdap--cdap-api