The Cask Data Application Platform (CDAP) is an integrated, open source application development platform for the Hadoop ecosystem that provides developers with data and application abstractions to simplify and accelerate application development.
—
CDAP provides a comprehensive data management framework with support for various dataset types, messaging systems, and data access patterns. The framework abstracts underlying storage technologies while providing consistent APIs for data operations across different storage systems.
The dataset framework is the foundation for data storage and access in CDAP, providing a unified abstraction layer over different storage systems.
import io.cdap.cdap.api.dataset.*;
import io.cdap.cdap.api.dataset.table.*;
// Base dataset interface
public interface Dataset {
void close() throws IOException;
}
// Dataset context for accessing datasets
public interface DatasetContext {
<T extends Dataset> T getDataset(String name) throws DataSetException;
<T extends Dataset> T getDataset(String namespace, String name) throws DataSetException;
void releaseDataset(Dataset dataset);
void discardDataset(Dataset dataset);
}
// Dataset management interface
public interface DatasetManager {
boolean datasetExists(String name) throws DataSetException;
DatasetProperties getDatasetProperties(String name) throws DataSetException;
void createDataset(String name, String type, DatasetProperties properties) throws DataSetException;
void updateDataset(String name, DatasetProperties properties) throws DataSetException;
void dropDataset(String name) throws DataSetException;
void truncateDataset(String name) throws DataSetException;
}
// Dataset configurer for application setup
public interface DatasetConfigurer {
void createDataset(String datasetName, String typeName, DatasetProperties properties);
void createDataset(String datasetName, String typeName);
void createDataset(String datasetName, Class<? extends Dataset> datasetClass, DatasetProperties props);
void createDataset(String datasetName, Class<? extends Dataset> datasetClass);
void addDatasetModule(String moduleName, Class<? extends DatasetModule> moduleClass);
void addDatasetType(Class<? extends Dataset> datasetClass);
}// Dataset properties container
public class DatasetProperties {
public static Builder builder() { return new Builder(); }
public Map<String, String> getProperties() { /* returns properties map */ }
public static class Builder {
public Builder add(String key, String value) { /* add property */ return this; }
public Builder addAll(Map<String, String> properties) { /* add all properties */ return this; }
public DatasetProperties build() { /* build properties */ }
}
}
// Dataset specification
public final class DatasetSpecification {
public String getName() { /* returns dataset name */ }
public String getType() { /* returns dataset type */ }
public Map<String, String> getProperties() { /* returns properties */ }
public Map<String, DatasetSpecification> getSpecifications() { /* returns nested specs */ }
}
// Dataset instantiation exception
public class DatasetInstantiationException extends RuntimeException {
public DatasetInstantiationException(String message) { super(message); }
public DatasetInstantiationException(String message, Throwable cause) { super(message, cause); }
}The Table dataset provides a flexible, schema-free NoSQL storage abstraction with support for complex queries and batch operations.
import io.cdap.cdap.api.dataset.table.*;
import io.cdap.cdap.api.data.batch.*;
// Table interface - core NoSQL storage
@Deprecated // Note: table based datasets will be removed in a future version
public interface Table extends BatchReadable<byte[], Row>, BatchWritable<byte[], Put>,
Dataset, RecordScannable<StructuredRecord>, RecordWritable<StructuredRecord> {
String TYPE = "table";
// Table properties
String PROPERTY_TTL = "dataset.table.ttl";
String PROPERTY_READLESS_INCREMENT = "dataset.table.readless.increment";
String PROPERTY_CONFLICT_DETECTION = "dataset.table.conflict.detection";
// Basic operations
Row get(Get get);
Row get(byte[] row);
Row get(byte[] row, byte[][] columns);
Row get(byte[] row, byte[] startColumn, byte[] stopColumn, int limit);
void put(Put put);
void put(byte[] row, byte[] column, byte[] value);
void put(byte[] row, byte[][] columns, byte[][] values);
boolean delete(Delete delete);
void delete(byte[] row);
void delete(byte[] row, byte[] column);
void delete(byte[] row, byte[][] columns);
// Scanning operations
Scanner scan(Scan scan);
Scanner scan(byte[] startRow, byte[] stopRow);
// Increment operations
Row increment(Increment increment);
long increment(byte[] row, byte[] column, long amount);
Row increment(byte[] row, byte[][] columns, long[] amounts);
// Batch operations
void write(byte[] key, Put value) throws IOException;
// Compare and swap operations
boolean compareAndSwap(byte[] row, byte[] column, byte[] expectedValue, byte[] newValue);
}
// Row representation
public interface Row {
byte[] getRow();
// Column access
boolean isEmpty();
int size();
Map<byte[], byte[]> getColumns();
byte[] get(byte[] column);
byte[] get(String column);
// Typed access methods
Boolean getBoolean(byte[] column);
Boolean getBoolean(String column);
Integer getInt(byte[] column);
Integer getInt(String column);
Long getLong(byte[] column);
Long getLong(String column);
Double getDouble(byte[] column);
Double getDouble(String column);
String getString(byte[] column);
String getString(String column);
}// Basic table operations
public class UserProfileService extends AbstractHttpServiceHandler {
@UseDataSet("user_profiles")
private Table userProfiles;
@GET
@Path("/user/{id}")
public void getUser(HttpServiceRequest request, HttpServiceResponder responder,
@PathParam("id") String userId) {
try {
Row row = userProfiles.get(Bytes.toBytes(userId));
if (row.isEmpty()) {
responder.sendError(404, "User not found");
return;
}
// Build user profile JSON
JsonObject profile = new JsonObject();
profile.addProperty("id", userId);
profile.addProperty("name", row.getString("name"));
profile.addProperty("email", row.getString("email"));
profile.addProperty("created", row.getLong("created"));
profile.addProperty("lastLogin", row.getLong("lastLogin"));
responder.sendJson(200, profile);
} catch (Exception e) {
responder.sendError(500, "Error retrieving user: " + e.getMessage());
}
}
@POST
@Path("/user")
public void createUser(HttpServiceRequest request, HttpServiceResponder responder) {
try {
String content = Charset.forName("UTF-8").decode(
ByteBuffer.wrap(request.getContent())).toString();
JsonObject userJson = new JsonParser().parse(content).getAsJsonObject();
String userId = userJson.get("id").getAsString();
String name = userJson.get("name").getAsString();
String email = userJson.get("email").getAsString();
// Create put operation
Put put = new Put(Bytes.toBytes(userId));
put.add("name", name);
put.add("email", email);
put.add("created", System.currentTimeMillis());
put.add("lastLogin", 0L);
put.add("status", "active");
userProfiles.put(put);
responder.sendString(201, "User created successfully", "text/plain");
} catch (Exception e) {
responder.sendError(400, "Error creating user: " + e.getMessage());
}
}
@PUT
@Path("/user/{id}/login")
public void recordLogin(HttpServiceRequest request, HttpServiceResponder responder,
@PathParam("id") String userId) {
try {
// Use increment for login count and update last login time
userProfiles.increment(Bytes.toBytes(userId), Bytes.toBytes("loginCount"), 1L);
userProfiles.put(Bytes.toBytes(userId), Bytes.toBytes("lastLogin"),
Bytes.toBytes(System.currentTimeMillis()));
responder.sendString(200, "Login recorded", "text/plain");
} catch (Exception e) {
responder.sendError(500, "Error recording login: " + e.getMessage());
}
}
}
// Complex table scanning and filtering
public class UserAnalyticsMapReduce extends AbstractMapReduce {
public static class UserStatsMapper extends Mapper<byte[], Row, Text, UserStats> {
@Override
protected void map(byte[] key, Row row, Context context)
throws IOException, InterruptedException {
String userId = Bytes.toString(key);
String status = row.getString("status");
Long created = row.getLong("created");
Long lastLogin = row.getLong("lastLogin");
Long loginCount = row.getLong("loginCount");
if (status != null && status.equals("active")) {
UserStats stats = new UserStats();
stats.setUserId(userId);
stats.setCreated(created != null ? created : 0L);
stats.setLastLogin(lastLogin != null ? lastLogin : 0L);
stats.setLoginCount(loginCount != null ? loginCount : 0L);
// Calculate activity metrics
long daysSinceCreation = (System.currentTimeMillis() - stats.getCreated()) / (24 * 60 * 60 * 1000);
long daysSinceLogin = (System.currentTimeMillis() - stats.getLastLogin()) / (24 * 60 * 60 * 1000);
stats.setDaysSinceCreation(daysSinceCreation);
stats.setDaysSinceLogin(daysSinceLogin);
// Categorize user activity level
String activityLevel;
if (daysSinceLogin <= 7) {
activityLevel = "highly_active";
} else if (daysSinceLogin <= 30) {
activityLevel = "moderately_active";
} else if (daysSinceLogin <= 90) {
activityLevel = "low_activity";
} else {
activityLevel = "inactive";
}
context.write(new Text(activityLevel), stats);
}
}
}
}A simplified key-value storage interface built on top of Table:
import io.cdap.cdap.api.dataset.lib.*;
// Key-Value table interface
@Deprecated // table based datasets will be removed in a future version
public interface KeyValueTable extends BatchReadable<byte[], KeyValue<byte[], byte[]>>,
BatchWritable<byte[], byte[]>, Dataset {
String TYPE = "keyValueTable";
// Basic operations
@ReadOnly
@Nullable
byte[] read(String key);
@ReadOnly
@Nullable
byte[] read(byte[] key);
@WriteOnly
void write(String key, String value);
@WriteOnly
void write(String key, byte[] value);
@WriteOnly
void write(byte[] key, byte[] value);
@WriteOnly
void increment(byte[] key, long amount);
@WriteOnly
void increment(String key, long amount);
@WriteOnly
void delete(String key);
@WriteOnly
void delete(byte[] key);
// Batch operations
@ReadOnly
Map<byte[], byte[]> readAll(byte[][] keys);
@WriteOnly
void writeAll(Map<byte[], byte[]> entries);
@WriteOnly
void deleteAll(byte[][] keys);
}
// Key-Value pair representation
public class KeyValue<K, V> {
public KeyValue(K key, V value) { /* constructor */ }
public K getKey() { /* returns key */ }
public V getValue() { /* returns value */ }
}
// Usage example
public class ConfigurationStore {
private KeyValueTable configTable;
public void storeConfiguration(String key, String value) {
configTable.write(key, value);
}
public String getConfiguration(String key) {
byte[] value = configTable.read(key);
return value != null ? Bytes.toString(value) : null;
}
public void updateCounter(String counterName, long increment) {
configTable.increment(counterName, increment);
}
public Map<String, String> getAllConfigurations(String[] keys) {
byte[][] keyBytes = Arrays.stream(keys)
.map(Bytes::toBytes)
.toArray(byte[][]::new);
Map<byte[], byte[]> results = configTable.readAll(keyBytes);
return results.entrySet().stream()
.collect(Collectors.toMap(
entry -> Bytes.toString(entry.getKey()),
entry -> Bytes.toString(entry.getValue())
));
}
}CDAP provides several file-based dataset types for working with HDFS and other file systems:
import io.cdap.cdap.api.dataset.lib.*;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
// FileSet interface for file-based operations
public interface FileSet extends Dataset, BatchReadable<Void, Location>, BatchWritable<Void, Location> {
String TYPE = "fileSet";
// File operations
Location getLocation(String relativePath) throws IOException;
Location getBaseLocation() throws IOException;
// Input/Output format configuration
Map<String, String> getInputFormatConfiguration();
Map<String, String> getOutputFormatConfiguration();
// Runtime arguments access
Map<String, String> getRuntimeArguments();
}
// File location abstraction
public interface Location {
String getName();
URI toURI();
boolean exists() throws IOException;
boolean isDirectory() throws IOException;
long lastModified() throws IOException;
long length() throws IOException;
// Stream operations
InputStream getInputStream() throws IOException;
OutputStream getOutputStream() throws IOException;
OutputStream getOutputStream(String permission) throws IOException;
// Directory operations
boolean mkdirs() throws IOException;
List<Location> list() throws IOException;
boolean delete() throws IOException;
boolean delete(boolean recursive) throws IOException;
// Path operations
Location append(String child) throws IOException;
Location append(Path child) throws IOException;
}
// FileSet properties and arguments
public final class FileSetProperties {
public static final String INPUT_FORMAT = "input.format";
public static final String OUTPUT_FORMAT = "output.format";
public static final String INPUT_PROPERTIES_PREFIX = "input.properties.";
public static final String OUTPUT_PROPERTIES_PREFIX = "output.properties.";
public static Builder builder() { return new Builder(); }
public static class Builder {
public Builder setInputFormat(Class<? extends InputFormat> inputFormat) { /* set input format */ return this; }
public Builder setOutputFormat(Class<? extends OutputFormat> outputFormat) { /* set output format */ return this; }
public Builder setInputProperty(String key, String value) { /* set input property */ return this; }
public Builder setOutputProperty(String key, String value) { /* set output property */ return this; }
public DatasetProperties build() { /* build properties */ }
}
}// Partitioned FileSet for organizing files by partitions
public interface PartitionedFileSet extends Dataset,
BatchReadable<PartitionKey, PartitionDetail>,
BatchWritable<PartitionKey, PartitionOutput> {
String TYPE = "partitionedFileSet";
// Partition operations
PartitionDetail getPartition(PartitionKey key);
Set<PartitionDetail> getPartitions(PartitionFilter filter);
void addPartition(PartitionKey key, String path);
void addPartition(PartitionKey key, String path, Map<String, String> metadata);
void dropPartition(PartitionKey key);
// Output operations
PartitionOutput getPartitionOutput(PartitionKey key);
Location getLocation(PartitionKey key);
// FileSet operations
FileSet getEmbeddedFileSet();
}
// Partition key for organizing data
public class PartitionKey {
public static Builder builder() { return new Builder(); }
public Map<String, Comparable<?>> getFields() { /* returns partition fields */ }
public static class Builder {
public Builder addField(String name, Comparable<?> value) { /* add partition field */ return this; }
public Builder addStringField(String name, String value) { /* add string field */ return this; }
public Builder addIntField(String name, int value) { /* add int field */ return this; }
public Builder addLongField(String name, long value) { /* add long field */ return this; }
public PartitionKey build() { /* build partition key */ }
}
}
// Partition metadata and details
public interface PartitionDetail {
PartitionKey getPartitionKey();
String getRelativePath();
Location getLocation() throws IOException;
Map<String, String> getMetadata();
long getLastModified();
}
// Partitioning strategy
public abstract class Partitioning {
public static Builder builder() { return new Builder(); }
public static class Builder {
public Builder addField(String name, Partitioning.FieldType type) { /* add field */ return this; }
public Builder addStringField(String name) { /* add string field */ return this; }
public Builder addIntField(String name) { /* add int field */ return this; }
public Builder addLongField(String name) { /* add long field */ return this; }
public Partitioning build() { /* build partitioning */ }
}
public enum FieldType {
STRING, INT, LONG
}
}// Time-based partitioning for time-series data
public interface TimePartitionedFileSet extends Dataset,
BatchReadable<Long, TimePartitionDetail>,
BatchWritable<Long, TimePartitionOutput> {
String TYPE = "timePartitionedFileSet";
// Time partition operations
TimePartitionDetail getPartitionByTime(long time);
Set<TimePartitionDetail> getPartitionsByTime(long startTime, long endTime);
TimePartitionOutput getPartitionOutput(long time);
// Partition management
void addPartition(long time, String path);
void addPartition(long time, String path, Map<String, String> metadata);
void dropPartition(long time);
// FileSet operations
PartitionedFileSet getEmbeddedFileSet();
}
// Time partition representation
public interface TimePartition {
long getTime();
String getRelativePath();
Location getLocation() throws IOException;
}
// Usage example for ETL processing
public class DailyETLWorkflow extends AbstractWorkflow {
@Override
public void configure(WorkflowConfigurer configurer) {
configurer.setName("DailyETLWorkflow");
// Add time-partitioned datasets for daily processing
configurer.addAction(new DataIngestionAction());
configurer.addMapReduce("DataTransformationMapReduce");
configurer.addAction(new PartitionCleanupAction());
}
public static class DataIngestionAction extends AbstractCustomAction {
@Override
public void run(CustomActionContext context) throws Exception {
TimePartitionedFileSet rawData = context.getDataset("raw_data");
// Get today's partition
long today = DateUtils.truncateToDay(System.currentTimeMillis());
TimePartitionOutput output = rawData.getPartitionOutput(today);
// Ingest data for today's partition
Location outputLocation = output.getLocation();
try (OutputStream os = outputLocation.getOutputStream()) {
// Write ingested data to partition
ingestDailyData(os);
}
// Add partition with metadata
Map<String, String> metadata = new HashMap<>();
metadata.put("ingestion.timestamp", String.valueOf(System.currentTimeMillis()));
metadata.put("source", "daily-feed");
output.addPartition(metadata);
}
private void ingestDailyData(OutputStream outputStream) throws IOException {
// Implementation for data ingestion
}
}
}CDAP provides a transactional messaging system for reliable message passing and stream processing:
import io.cdap.cdap.api.messaging.*;
import io.cdap.cdap.api.*;
import java.nio.charset.StandardCharsets;
// Message publisher interface
@Beta
public interface MessagePublisher {
// Publish single message
void publish(String namespace, String topic, String payload) throws TopicNotFoundException, IOException, AccessException;
void publish(String namespace, String topic, String payload, Charset charset) throws TopicNotFoundException, IOException, AccessException;
void publish(String namespace, String topic, byte[] payload) throws TopicNotFoundException, IOException, AccessException;
// Publish multiple messages
void publish(String namespace, String topic, String charset, String... payloads) throws TopicNotFoundException, IOException, AccessException;
void publish(String namespace, String topic, Charset charset, String... payloads) throws TopicNotFoundException, IOException, AccessException;
void publish(String namespace, String topic, byte[]... payloads) throws TopicNotFoundException, IOException, AccessException;
void publish(String namespace, String topic, Iterator<byte[]> payloads) throws TopicNotFoundException, IOException, AccessException;
}
// Message fetcher interface
@Beta
public interface MessageFetcher {
// Fetch messages with limit
CloseableIterator<Message> fetch(String namespace, String topic, int limit, long afterMessageId)
throws TopicNotFoundException, IOException, AccessException;
// Fetch messages with time range
CloseableIterator<Message> fetch(String namespace, String topic, int limit, long startTime, long endTime)
throws TopicNotFoundException, IOException, AccessException;
}
// Message representation
public interface Message {
String getId();
byte[] getPayload();
long getPublishTimestamp();
Map<String, String> getHeaders();
}
// Messaging administration
@Beta
public interface MessagingAdmin {
void createTopic(TopicMetadata topicMetadata) throws TopicAlreadyExistsException, IOException, AccessException;
void updateTopic(TopicMetadata topicMetadata) throws TopicNotFoundException, IOException, AccessException;
void deleteTopic(String namespace, String topic) throws TopicNotFoundException, IOException, AccessException;
List<TopicMetadata> listTopics(String namespace) throws IOException, AccessException;
TopicMetadata getTopic(String namespace, String topic) throws TopicNotFoundException, IOException, AccessException;
}
// Topic metadata
public class TopicMetadata {
public static Builder builder(String topic) { return new Builder(topic); }
public String getTopic() { /* returns topic name */ }
public String getNamespace() { /* returns namespace */ }
public Map<String, String> getProperties() { /* returns properties */ }
public int getGeneration() { /* returns generation */ }
public static class Builder {
public Builder setNamespace(String namespace) { /* set namespace */ return this; }
public Builder setDescription(String description) { /* set description */ return this; }
public Builder setProperty(String key, String value) { /* set property */ return this; }
public Builder setProperties(Map<String, String> properties) { /* set properties */ return this; }
public TopicMetadata build() { /* build metadata */ }
}
}// Messaging context for accessing messaging APIs
public interface MessagingContext {
MessagePublisher getMessagePublisher();
MessageFetcher getMessageFetcher();
MessagingAdmin getMessagingAdmin();
}
// Usage in worker programs
public class MessageProcessingWorker extends AbstractWorker {
@Override
public void configure(WorkerConfigurer configurer) {
configurer.setName("MessageProcessor");
configurer.setDescription("Processes messages from topic");
}
@Override
public void run() throws Exception {
WorkerContext context = getContext();
MessagingContext messagingContext = context.getMessagingContext();
MessageFetcher fetcher = messagingContext.getMessageFetcher();
MessagePublisher publisher = messagingContext.getMessagePublisher();
String namespace = context.getNamespace();
long lastProcessedId = getLastProcessedMessageId();
while (context.getState().equals(ProgramRunStatus.RUNNING)) {
try (CloseableIterator<Message> messages =
fetcher.fetch(namespace, "input-topic", 100, lastProcessedId)) {
while (messages.hasNext()) {
Message message = messages.next();
// Process message
ProcessedMessage processed = processMessage(message);
// Publish result
if (processed != null) {
publisher.publish(namespace, "output-topic", processed.toJson());
}
lastProcessedId = Long.parseLong(message.getId());
}
}
// Save checkpoint
saveLastProcessedMessageId(lastProcessedId);
// Sleep before next fetch
Thread.sleep(1000);
}
}
private ProcessedMessage processMessage(Message message) {
// Implementation for message processing
return new ProcessedMessage(new String(message.getPayload(), StandardCharsets.UTF_8));
}
private long getLastProcessedMessageId() {
// Implementation to retrieve last processed message ID
return 0L;
}
private void saveLastProcessedMessageId(long messageId) {
// Implementation to save checkpoint
}
}
// Usage in MapReduce for batch message processing
public class MessageBatchProcessor extends AbstractMapReduce {
@Override
public void initialize(MapReduceContext context) throws Exception {
Job job = context.getHadoopJob();
// Configure to read from messaging system
MessagingUtils.configureInput(job, context.getNamespace(), "batch-topic");
// Configure output dataset
context.setOutput(Output.ofDataset("processed_messages"));
job.setMapperClass(MessageMapper.class);
job.setReducerClass(MessageAggregator.class);
}
public static class MessageMapper extends Mapper<LongWritable, Message, Text, IntWritable> {
@Override
protected void map(LongWritable key, Message message, Context context)
throws IOException, InterruptedException {
String payload = new String(message.getPayload(), StandardCharsets.UTF_8);
JsonObject json = new JsonParser().parse(payload).getAsJsonObject();
String eventType = json.get("eventType").getAsString();
context.write(new Text(eventType), new IntWritable(1));
}
}
public static class MessageAggregator extends Reducer<Text, IntWritable, byte[], Put> {
@Override
protected void reduce(Text eventType, Iterable<IntWritable> counts, Context context)
throws IOException, InterruptedException {
int total = 0;
for (IntWritable count : counts) {
total += count.get();
}
Put put = new Put(Bytes.toBytes(eventType.toString()));
put.add("stats", "count", total);
put.add("stats", "timestamp", System.currentTimeMillis());
context.write(Bytes.toBytes(eventType.toString()), put);
}
}
}// Custom dataset definition
public abstract class AbstractDatasetDefinition<T extends Dataset>
implements DatasetDefinition<T> {
private final String name;
public AbstractDatasetDefinition(String name) {
this.name = name;
}
@Override
public String getName() {
return name;
}
@Override
public abstract DatasetSpecification configure(String instanceName, DatasetProperties properties);
@Override
public abstract T getDataset(DatasetContext datasetContext, DatasetSpecification spec,
Map<String, String> arguments, ClassLoader classLoader) throws IOException;
}
// Dataset state persistence
public interface DatasetStatePersistor {
void persistState(String name, byte[] state) throws IOException;
byte[] readState(String name) throws IOException;
void deleteState(String name) throws IOException;
}
// Composite dataset for combining multiple datasets
public abstract class CompositeDatasetDefinition<T extends Dataset>
extends AbstractDatasetDefinition<T> {
protected CompositeDatasetDefinition(String name) {
super(name);
}
// Methods for managing constituent datasets
protected abstract Map<String, DatasetSpecification> getConstituentDatasets(DatasetProperties properties);
}The CDAP data management framework provides a comprehensive, abstracted approach to data storage and access, enabling applications to work with various storage systems through consistent APIs while maintaining enterprise-grade features like transactions, security, and operational control.
Install with Tessl CLI
npx tessl i tessl/maven-io-cdap-cdap--cdap