CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-io-cdap-cdap--cdap

The Cask Data Application Platform (CDAP) is an integrated, open source application development platform for the Hadoop ecosystem that provides developers with data and application abstractions to simplify and accelerate application development.

Pending
Overview
Eval results
Files

data-management.mddocs/

Data Management

CDAP provides a comprehensive data management framework with support for various dataset types, messaging systems, and data access patterns. The framework abstracts underlying storage technologies while providing consistent APIs for data operations across different storage systems.

Dataset Framework

The dataset framework is the foundation for data storage and access in CDAP, providing a unified abstraction layer over different storage systems.

Core Dataset Interfaces

import io.cdap.cdap.api.dataset.*;
import io.cdap.cdap.api.dataset.table.*;

// Base dataset interface
public interface Dataset {
    void close() throws IOException;
}

// Dataset context for accessing datasets
public interface DatasetContext {
    <T extends Dataset> T getDataset(String name) throws DataSetException;
    <T extends Dataset> T getDataset(String namespace, String name) throws DataSetException;
    void releaseDataset(Dataset dataset);
    void discardDataset(Dataset dataset);
}

// Dataset management interface
public interface DatasetManager {
    boolean datasetExists(String name) throws DataSetException;
    DatasetProperties getDatasetProperties(String name) throws DataSetException;
    void createDataset(String name, String type, DatasetProperties properties) throws DataSetException;
    void updateDataset(String name, DatasetProperties properties) throws DataSetException;
    void dropDataset(String name) throws DataSetException;
    void truncateDataset(String name) throws DataSetException;
}

// Dataset configurer for application setup
public interface DatasetConfigurer {
    void createDataset(String datasetName, String typeName, DatasetProperties properties);
    void createDataset(String datasetName, String typeName);
    void createDataset(String datasetName, Class<? extends Dataset> datasetClass, DatasetProperties props);
    void createDataset(String datasetName, Class<? extends Dataset> datasetClass);
    void addDatasetModule(String moduleName, Class<? extends DatasetModule> moduleClass);
    void addDatasetType(Class<? extends Dataset> datasetClass);
}

Dataset Properties and Configuration

// Dataset properties container
public class DatasetProperties {
    public static Builder builder() { return new Builder(); }
    public Map<String, String> getProperties() { /* returns properties map */ }
    
    public static class Builder {
        public Builder add(String key, String value) { /* add property */ return this; }
        public Builder addAll(Map<String, String> properties) { /* add all properties */ return this; }
        public DatasetProperties build() { /* build properties */ }
    }
}

// Dataset specification
public final class DatasetSpecification {
    public String getName() { /* returns dataset name */ }
    public String getType() { /* returns dataset type */ }
    public Map<String, String> getProperties() { /* returns properties */ }
    public Map<String, DatasetSpecification> getSpecifications() { /* returns nested specs */ }
}

// Dataset instantiation exception
public class DatasetInstantiationException extends RuntimeException {
    public DatasetInstantiationException(String message) { super(message); }
    public DatasetInstantiationException(String message, Throwable cause) { super(message, cause); }
}

Table Dataset

The Table dataset provides a flexible, schema-free NoSQL storage abstraction with support for complex queries and batch operations.

Table Interface and Operations

import io.cdap.cdap.api.dataset.table.*;
import io.cdap.cdap.api.data.batch.*;

// Table interface - core NoSQL storage
@Deprecated // Note: table based datasets will be removed in a future version
public interface Table extends BatchReadable<byte[], Row>, BatchWritable<byte[], Put>,
                              Dataset, RecordScannable<StructuredRecord>, RecordWritable<StructuredRecord> {
    
    String TYPE = "table";
    
    // Table properties
    String PROPERTY_TTL = "dataset.table.ttl";
    String PROPERTY_READLESS_INCREMENT = "dataset.table.readless.increment";
    String PROPERTY_CONFLICT_DETECTION = "dataset.table.conflict.detection";
    
    // Basic operations
    Row get(Get get);
    Row get(byte[] row);
    Row get(byte[] row, byte[][] columns);
    Row get(byte[] row, byte[] startColumn, byte[] stopColumn, int limit);
    
    void put(Put put);
    void put(byte[] row, byte[] column, byte[] value);
    void put(byte[] row, byte[][] columns, byte[][] values);
    
    boolean delete(Delete delete);
    void delete(byte[] row);
    void delete(byte[] row, byte[] column);
    void delete(byte[] row, byte[][] columns);
    
    // Scanning operations
    Scanner scan(Scan scan);
    Scanner scan(byte[] startRow, byte[] stopRow);
    
    // Increment operations
    Row increment(Increment increment);
    long increment(byte[] row, byte[] column, long amount);
    Row increment(byte[] row, byte[][] columns, long[] amounts);
    
    // Batch operations
    void write(byte[] key, Put value) throws IOException;
    
    // Compare and swap operations
    boolean compareAndSwap(byte[] row, byte[] column, byte[] expectedValue, byte[] newValue);
}

// Row representation
public interface Row {
    byte[] getRow();
    
    // Column access
    boolean isEmpty();
    int size();
    Map<byte[], byte[]> getColumns();
    byte[] get(byte[] column);
    byte[] get(String column);
    
    // Typed access methods
    Boolean getBoolean(byte[] column);
    Boolean getBoolean(String column);
    Integer getInt(byte[] column);
    Integer getInt(String column);  
    Long getLong(byte[] column);
    Long getLong(String column);
    Double getDouble(byte[] column);
    Double getDouble(String column);
    String getString(byte[] column);
    String getString(String column);
}

Table Operations Examples

// Basic table operations
public class UserProfileService extends AbstractHttpServiceHandler {
    
    @UseDataSet("user_profiles")
    private Table userProfiles;
    
    @GET
    @Path("/user/{id}")
    public void getUser(HttpServiceRequest request, HttpServiceResponder responder,
                       @PathParam("id") String userId) {
        try {
            Row row = userProfiles.get(Bytes.toBytes(userId));
            if (row.isEmpty()) {
                responder.sendError(404, "User not found");
                return;
            }
            
            // Build user profile JSON
            JsonObject profile = new JsonObject();
            profile.addProperty("id", userId);
            profile.addProperty("name", row.getString("name"));
            profile.addProperty("email", row.getString("email"));
            profile.addProperty("created", row.getLong("created"));
            profile.addProperty("lastLogin", row.getLong("lastLogin"));
            
            responder.sendJson(200, profile);
        } catch (Exception e) {
            responder.sendError(500, "Error retrieving user: " + e.getMessage());
        }
    }
    
    @POST
    @Path("/user")
    public void createUser(HttpServiceRequest request, HttpServiceResponder responder) {
        try {
            String content = Charset.forName("UTF-8").decode(
                ByteBuffer.wrap(request.getContent())).toString();
            JsonObject userJson = new JsonParser().parse(content).getAsJsonObject();
            
            String userId = userJson.get("id").getAsString();
            String name = userJson.get("name").getAsString();
            String email = userJson.get("email").getAsString();
            
            // Create put operation
            Put put = new Put(Bytes.toBytes(userId));
            put.add("name", name);
            put.add("email", email);
            put.add("created", System.currentTimeMillis());
            put.add("lastLogin", 0L);
            put.add("status", "active");
            
            userProfiles.put(put);
            responder.sendString(201, "User created successfully", "text/plain");
            
        } catch (Exception e) {
            responder.sendError(400, "Error creating user: " + e.getMessage());
        }
    }
    
    @PUT
    @Path("/user/{id}/login")
    public void recordLogin(HttpServiceRequest request, HttpServiceResponder responder,
                           @PathParam("id") String userId) {
        try {
            // Use increment for login count and update last login time
            userProfiles.increment(Bytes.toBytes(userId), Bytes.toBytes("loginCount"), 1L);
            userProfiles.put(Bytes.toBytes(userId), Bytes.toBytes("lastLogin"), 
                           Bytes.toBytes(System.currentTimeMillis()));
            
            responder.sendString(200, "Login recorded", "text/plain");
        } catch (Exception e) {
            responder.sendError(500, "Error recording login: " + e.getMessage());
        }
    }
}

// Complex table scanning and filtering
public class UserAnalyticsMapReduce extends AbstractMapReduce {
    
    public static class UserStatsMapper extends Mapper<byte[], Row, Text, UserStats> {
        
        @Override
        protected void map(byte[] key, Row row, Context context) 
            throws IOException, InterruptedException {
            
            String userId = Bytes.toString(key);
            String status = row.getString("status");
            Long created = row.getLong("created");
            Long lastLogin = row.getLong("lastLogin");
            Long loginCount = row.getLong("loginCount");
            
            if (status != null && status.equals("active")) {
                UserStats stats = new UserStats();
                stats.setUserId(userId);
                stats.setCreated(created != null ? created : 0L);
                stats.setLastLogin(lastLogin != null ? lastLogin : 0L);
                stats.setLoginCount(loginCount != null ? loginCount : 0L);
                
                // Calculate activity metrics
                long daysSinceCreation = (System.currentTimeMillis() - stats.getCreated()) / (24 * 60 * 60 * 1000);
                long daysSinceLogin = (System.currentTimeMillis() - stats.getLastLogin()) / (24 * 60 * 60 * 1000);
                
                stats.setDaysSinceCreation(daysSinceCreation);
                stats.setDaysSinceLogin(daysSinceLogin);
                
                // Categorize user activity level
                String activityLevel;
                if (daysSinceLogin <= 7) {
                    activityLevel = "highly_active";
                } else if (daysSinceLogin <= 30) {
                    activityLevel = "moderately_active";
                } else if (daysSinceLogin <= 90) {
                    activityLevel = "low_activity";
                } else {
                    activityLevel = "inactive";
                }
                
                context.write(new Text(activityLevel), stats);
            }
        }
    }
}

Key-Value Table

A simplified key-value storage interface built on top of Table:

import io.cdap.cdap.api.dataset.lib.*;

// Key-Value table interface
@Deprecated // table based datasets will be removed in a future version
public interface KeyValueTable extends BatchReadable<byte[], KeyValue<byte[], byte[]>>,
                                      BatchWritable<byte[], byte[]>, Dataset {
    
    String TYPE = "keyValueTable";
    
    // Basic operations
    @ReadOnly
    @Nullable
    byte[] read(String key);
    
    @ReadOnly 
    @Nullable
    byte[] read(byte[] key);
    
    @WriteOnly
    void write(String key, String value);
    
    @WriteOnly
    void write(String key, byte[] value);
    
    @WriteOnly
    void write(byte[] key, byte[] value);
    
    @WriteOnly
    void increment(byte[] key, long amount);
    
    @WriteOnly
    void increment(String key, long amount);
    
    @WriteOnly
    void delete(String key);
    
    @WriteOnly
    void delete(byte[] key);
    
    // Batch operations
    @ReadOnly
    Map<byte[], byte[]> readAll(byte[][] keys);
    
    @WriteOnly
    void writeAll(Map<byte[], byte[]> entries);
    
    @WriteOnly
    void deleteAll(byte[][] keys);
}

// Key-Value pair representation
public class KeyValue<K, V> {
    public KeyValue(K key, V value) { /* constructor */ }
    public K getKey() { /* returns key */ }
    public V getValue() { /* returns value */ }
}

// Usage example
public class ConfigurationStore {
    private KeyValueTable configTable;
    
    public void storeConfiguration(String key, String value) {
        configTable.write(key, value);
    }
    
    public String getConfiguration(String key) {
        byte[] value = configTable.read(key);
        return value != null ? Bytes.toString(value) : null;
    }
    
    public void updateCounter(String counterName, long increment) {
        configTable.increment(counterName, increment);
    }
    
    public Map<String, String> getAllConfigurations(String[] keys) {
        byte[][] keyBytes = Arrays.stream(keys)
            .map(Bytes::toBytes)
            .toArray(byte[][]::new);
            
        Map<byte[], byte[]> results = configTable.readAll(keyBytes);
        
        return results.entrySet().stream()
            .collect(Collectors.toMap(
                entry -> Bytes.toString(entry.getKey()),
                entry -> Bytes.toString(entry.getValue())
            ));
    }
}

File-Based Datasets

CDAP provides several file-based dataset types for working with HDFS and other file systems:

FileSet Dataset

import io.cdap.cdap.api.dataset.lib.*;
import org.apache.hadoop.fs.Path;
import java.io.IOException;

// FileSet interface for file-based operations
public interface FileSet extends Dataset, BatchReadable<Void, Location>, BatchWritable<Void, Location> {
    
    String TYPE = "fileSet";
    
    // File operations
    Location getLocation(String relativePath) throws IOException;
    Location getBaseLocation() throws IOException;
    
    // Input/Output format configuration
    Map<String, String> getInputFormatConfiguration();
    Map<String, String> getOutputFormatConfiguration();
    
    // Runtime arguments access
    Map<String, String> getRuntimeArguments();
}

// File location abstraction
public interface Location {
    String getName();
    URI toURI();
    boolean exists() throws IOException;
    boolean isDirectory() throws IOException;
    long lastModified() throws IOException;
    long length() throws IOException;
    
    // Stream operations
    InputStream getInputStream() throws IOException;
    OutputStream getOutputStream() throws IOException;
    OutputStream getOutputStream(String permission) throws IOException;
    
    // Directory operations
    boolean mkdirs() throws IOException;
    List<Location> list() throws IOException;
    boolean delete() throws IOException;
    boolean delete(boolean recursive) throws IOException;
    
    // Path operations
    Location append(String child) throws IOException;
    Location append(Path child) throws IOException;
}

// FileSet properties and arguments
public final class FileSetProperties {
    public static final String INPUT_FORMAT = "input.format";
    public static final String OUTPUT_FORMAT = "output.format";
    public static final String INPUT_PROPERTIES_PREFIX = "input.properties.";
    public static final String OUTPUT_PROPERTIES_PREFIX = "output.properties.";
    
    public static Builder builder() { return new Builder(); }
    
    public static class Builder {
        public Builder setInputFormat(Class<? extends InputFormat> inputFormat) { /* set input format */ return this; }
        public Builder setOutputFormat(Class<? extends OutputFormat> outputFormat) { /* set output format */ return this; }
        public Builder setInputProperty(String key, String value) { /* set input property */ return this; }
        public Builder setOutputProperty(String key, String value) { /* set output property */ return this; }
        public DatasetProperties build() { /* build properties */ }
    }
}

PartitionedFileSet Dataset

// Partitioned FileSet for organizing files by partitions
public interface PartitionedFileSet extends Dataset, 
                                           BatchReadable<PartitionKey, PartitionDetail>,
                                           BatchWritable<PartitionKey, PartitionOutput> {
    
    String TYPE = "partitionedFileSet";
    
    // Partition operations
    PartitionDetail getPartition(PartitionKey key);
    Set<PartitionDetail> getPartitions(PartitionFilter filter);
    void addPartition(PartitionKey key, String path);
    void addPartition(PartitionKey key, String path, Map<String, String> metadata);
    void dropPartition(PartitionKey key);
    
    // Output operations
    PartitionOutput getPartitionOutput(PartitionKey key);
    Location getLocation(PartitionKey key);
    
    // FileSet operations
    FileSet getEmbeddedFileSet();
}

// Partition key for organizing data
public class PartitionKey {
    public static Builder builder() { return new Builder(); }
    public Map<String, Comparable<?>> getFields() { /* returns partition fields */ }
    
    public static class Builder {
        public Builder addField(String name, Comparable<?> value) { /* add partition field */ return this; }
        public Builder addStringField(String name, String value) { /* add string field */ return this; }
        public Builder addIntField(String name, int value) { /* add int field */ return this; }
        public Builder addLongField(String name, long value) { /* add long field */ return this; }
        public PartitionKey build() { /* build partition key */ }
    }
}

// Partition metadata and details
public interface PartitionDetail {
    PartitionKey getPartitionKey();
    String getRelativePath();
    Location getLocation() throws IOException;
    Map<String, String> getMetadata();
    long getLastModified();
}

// Partitioning strategy
public abstract class Partitioning {
    public static Builder builder() { return new Builder(); }
    
    public static class Builder {
        public Builder addField(String name, Partitioning.FieldType type) { /* add field */ return this; }
        public Builder addStringField(String name) { /* add string field */ return this; }
        public Builder addIntField(String name) { /* add int field */ return this; }
        public Builder addLongField(String name) { /* add long field */ return this; }
        public Partitioning build() { /* build partitioning */ }
    }
    
    public enum FieldType {
        STRING, INT, LONG
    }
}

Time-Partitioned FileSet

// Time-based partitioning for time-series data
public interface TimePartitionedFileSet extends Dataset,
                                               BatchReadable<Long, TimePartitionDetail>,
                                               BatchWritable<Long, TimePartitionOutput> {
    
    String TYPE = "timePartitionedFileSet";
    
    // Time partition operations
    TimePartitionDetail getPartitionByTime(long time);
    Set<TimePartitionDetail> getPartitionsByTime(long startTime, long endTime);
    TimePartitionOutput getPartitionOutput(long time);
    
    // Partition management
    void addPartition(long time, String path);
    void addPartition(long time, String path, Map<String, String> metadata);
    void dropPartition(long time);
    
    // FileSet operations
    PartitionedFileSet getEmbeddedFileSet();
}

// Time partition representation  
public interface TimePartition {
    long getTime();
    String getRelativePath();
    Location getLocation() throws IOException;
}

// Usage example for ETL processing
public class DailyETLWorkflow extends AbstractWorkflow {
    
    @Override
    public void configure(WorkflowConfigurer configurer) {
        configurer.setName("DailyETLWorkflow");
        
        // Add time-partitioned datasets for daily processing
        configurer.addAction(new DataIngestionAction());
        configurer.addMapReduce("DataTransformationMapReduce");
        configurer.addAction(new PartitionCleanupAction());
    }
    
    public static class DataIngestionAction extends AbstractCustomAction {
        
        @Override
        public void run(CustomActionContext context) throws Exception {
            TimePartitionedFileSet rawData = context.getDataset("raw_data");
            
            // Get today's partition
            long today = DateUtils.truncateToDay(System.currentTimeMillis());
            TimePartitionOutput output = rawData.getPartitionOutput(today);
            
            // Ingest data for today's partition
            Location outputLocation = output.getLocation();
            try (OutputStream os = outputLocation.getOutputStream()) {
                // Write ingested data to partition
                ingestDailyData(os);
            }
            
            // Add partition with metadata
            Map<String, String> metadata = new HashMap<>();
            metadata.put("ingestion.timestamp", String.valueOf(System.currentTimeMillis()));
            metadata.put("source", "daily-feed");
            
            output.addPartition(metadata);
        }
        
        private void ingestDailyData(OutputStream outputStream) throws IOException {
            // Implementation for data ingestion
        }
    }
}

Messaging System

CDAP provides a transactional messaging system for reliable message passing and stream processing:

Messaging Interfaces

import io.cdap.cdap.api.messaging.*;
import io.cdap.cdap.api.*;
import java.nio.charset.StandardCharsets;

// Message publisher interface
@Beta
public interface MessagePublisher {
    
    // Publish single message
    void publish(String namespace, String topic, String payload) throws TopicNotFoundException, IOException, AccessException;
    void publish(String namespace, String topic, String payload, Charset charset) throws TopicNotFoundException, IOException, AccessException;
    void publish(String namespace, String topic, byte[] payload) throws TopicNotFoundException, IOException, AccessException;
    
    // Publish multiple messages  
    void publish(String namespace, String topic, String charset, String... payloads) throws TopicNotFoundException, IOException, AccessException;
    void publish(String namespace, String topic, Charset charset, String... payloads) throws TopicNotFoundException, IOException, AccessException;
    void publish(String namespace, String topic, byte[]... payloads) throws TopicNotFoundException, IOException, AccessException;
    void publish(String namespace, String topic, Iterator<byte[]> payloads) throws TopicNotFoundException, IOException, AccessException;
}

// Message fetcher interface
@Beta
public interface MessageFetcher {
    
    // Fetch messages with limit
    CloseableIterator<Message> fetch(String namespace, String topic, int limit, long afterMessageId) 
        throws TopicNotFoundException, IOException, AccessException;
        
    // Fetch messages with time range
    CloseableIterator<Message> fetch(String namespace, String topic, int limit, long startTime, long endTime)
        throws TopicNotFoundException, IOException, AccessException;
}

// Message representation
public interface Message {
    String getId();
    byte[] getPayload();
    long getPublishTimestamp();
    Map<String, String> getHeaders();
}

// Messaging administration
@Beta  
public interface MessagingAdmin {
    void createTopic(TopicMetadata topicMetadata) throws TopicAlreadyExistsException, IOException, AccessException;
    void updateTopic(TopicMetadata topicMetadata) throws TopicNotFoundException, IOException, AccessException;
    void deleteTopic(String namespace, String topic) throws TopicNotFoundException, IOException, AccessException;
    List<TopicMetadata> listTopics(String namespace) throws IOException, AccessException;
    TopicMetadata getTopic(String namespace, String topic) throws TopicNotFoundException, IOException, AccessException;
}

// Topic metadata
public class TopicMetadata {
    public static Builder builder(String topic) { return new Builder(topic); }
    
    public String getTopic() { /* returns topic name */ }
    public String getNamespace() { /* returns namespace */ }
    public Map<String, String> getProperties() { /* returns properties */ }
    public int getGeneration() { /* returns generation */ }
    
    public static class Builder {
        public Builder setNamespace(String namespace) { /* set namespace */ return this; }
        public Builder setDescription(String description) { /* set description */ return this; }
        public Builder setProperty(String key, String value) { /* set property */ return this; }
        public Builder setProperties(Map<String, String> properties) { /* set properties */ return this; }
        public TopicMetadata build() { /* build metadata */ }
    }
}

Messaging Context and Usage

// Messaging context for accessing messaging APIs
public interface MessagingContext {
    MessagePublisher getMessagePublisher();
    MessageFetcher getMessageFetcher();
    MessagingAdmin getMessagingAdmin();
}

// Usage in worker programs
public class MessageProcessingWorker extends AbstractWorker {
    
    @Override
    public void configure(WorkerConfigurer configurer) {
        configurer.setName("MessageProcessor");
        configurer.setDescription("Processes messages from topic");
    }
    
    @Override
    public void run() throws Exception {
        WorkerContext context = getContext();
        MessagingContext messagingContext = context.getMessagingContext();
        
        MessageFetcher fetcher = messagingContext.getMessageFetcher();
        MessagePublisher publisher = messagingContext.getMessagePublisher();
        
        String namespace = context.getNamespace();
        long lastProcessedId = getLastProcessedMessageId();
        
        while (context.getState().equals(ProgramRunStatus.RUNNING)) {
            try (CloseableIterator<Message> messages = 
                 fetcher.fetch(namespace, "input-topic", 100, lastProcessedId)) {
                
                while (messages.hasNext()) {
                    Message message = messages.next();
                    
                    // Process message
                    ProcessedMessage processed = processMessage(message);
                    
                    // Publish result
                    if (processed != null) {
                        publisher.publish(namespace, "output-topic", processed.toJson());
                    }
                    
                    lastProcessedId = Long.parseLong(message.getId());
                }
            }
            
            // Save checkpoint
            saveLastProcessedMessageId(lastProcessedId);
            
            // Sleep before next fetch
            Thread.sleep(1000);
        }
    }
    
    private ProcessedMessage processMessage(Message message) {
        // Implementation for message processing
        return new ProcessedMessage(new String(message.getPayload(), StandardCharsets.UTF_8));
    }
    
    private long getLastProcessedMessageId() {
        // Implementation to retrieve last processed message ID
        return 0L;
    }
    
    private void saveLastProcessedMessageId(long messageId) {
        // Implementation to save checkpoint
    }
}

// Usage in MapReduce for batch message processing  
public class MessageBatchProcessor extends AbstractMapReduce {
    
    @Override
    public void initialize(MapReduceContext context) throws Exception {
        Job job = context.getHadoopJob();
        
        // Configure to read from messaging system
        MessagingUtils.configureInput(job, context.getNamespace(), "batch-topic");
        
        // Configure output dataset
        context.setOutput(Output.ofDataset("processed_messages"));
        
        job.setMapperClass(MessageMapper.class);
        job.setReducerClass(MessageAggregator.class);
    }
    
    public static class MessageMapper extends Mapper<LongWritable, Message, Text, IntWritable> {
        
        @Override
        protected void map(LongWritable key, Message message, Context context) 
            throws IOException, InterruptedException {
            
            String payload = new String(message.getPayload(), StandardCharsets.UTF_8);
            JsonObject json = new JsonParser().parse(payload).getAsJsonObject();
            
            String eventType = json.get("eventType").getAsString();
            context.write(new Text(eventType), new IntWritable(1));
        }
    }
    
    public static class MessageAggregator extends Reducer<Text, IntWritable, byte[], Put> {
        
        @Override
        protected void reduce(Text eventType, Iterable<IntWritable> counts, Context context)
            throws IOException, InterruptedException {
            
            int total = 0;
            for (IntWritable count : counts) {
                total += count.get();
            }
            
            Put put = new Put(Bytes.toBytes(eventType.toString()));
            put.add("stats", "count", total);
            put.add("stats", "timestamp", System.currentTimeMillis());
            
            context.write(Bytes.toBytes(eventType.toString()), put);
        }
    }
}

Advanced Dataset Patterns

Custom Dataset Implementation

// Custom dataset definition
public abstract class AbstractDatasetDefinition<T extends Dataset> 
    implements DatasetDefinition<T> {
    
    private final String name;
    
    public AbstractDatasetDefinition(String name) {
        this.name = name;
    }
    
    @Override
    public String getName() {
        return name;
    }
    
    @Override
    public abstract DatasetSpecification configure(String instanceName, DatasetProperties properties);
    
    @Override
    public abstract T getDataset(DatasetContext datasetContext, DatasetSpecification spec,
                                Map<String, String> arguments, ClassLoader classLoader) throws IOException;
}

// Dataset state persistence
public interface DatasetStatePersistor {
    void persistState(String name, byte[] state) throws IOException;
    byte[] readState(String name) throws IOException;
    void deleteState(String name) throws IOException;
}

// Composite dataset for combining multiple datasets
public abstract class CompositeDatasetDefinition<T extends Dataset> 
    extends AbstractDatasetDefinition<T> {
    
    protected CompositeDatasetDefinition(String name) {
        super(name);
    }
    
    // Methods for managing constituent datasets
    protected abstract Map<String, DatasetSpecification> getConstituentDatasets(DatasetProperties properties);
}

The CDAP data management framework provides a comprehensive, abstracted approach to data storage and access, enabling applications to work with various storage systems through consistent APIs while maintaining enterprise-grade features like transactions, security, and operational control.

Install with Tessl CLI

npx tessl i tessl/maven-io-cdap-cdap--cdap

docs

application-framework.md

data-management.md

data-processing.md

index.md

operational.md

plugin-system.md

security-metadata.md

tile.json