CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-co-cask-cdap--cdap-api

Core application programming interface for the Cask Data Application Platform enabling development of scalable data processing applications on Hadoop ecosystems.

Pending
Overview
Eval results
Files

dataset-management.mddocs/

Dataset Management

CDAP's Dataset Management system provides a comprehensive abstraction layer for data storage and access, supporting both built-in dataset types and custom implementations with ACID transaction support.

Core Dataset Interfaces

Dataset

public interface Dataset extends Closeable {
    @Override
    void close();
}

Base interface for all datasets. All dataset implementations must extend this interface and provide proper resource cleanup.

DatasetDefinition

public interface DatasetDefinition<D extends Dataset, A extends DatasetAdmin> {
    String getName();
    
    D getDataset(DatasetContext datasetContext, DatasetSpecification spec, 
                 Map<String, String> arguments, ClassLoader classLoader) throws IOException;
    
    A getAdmin(DatasetContext datasetContext, DatasetSpecification spec, 
               ClassLoader classLoader) throws IOException;
    
    DatasetSpecification configure(String instanceName, DatasetProperties properties);
    
    DatasetSpecification reconfigure(String instanceName, DatasetProperties newProperties,
                                   DatasetSpecification currentSpec) throws IncompatibleUpdateException;
}

Defines how dataset instances are created, configured, and managed. Custom dataset types implement this interface.

AbstractDatasetDefinition

public abstract class AbstractDatasetDefinition<D extends Dataset, A extends DatasetAdmin> 
    implements DatasetDefinition<D, A> {
    
    protected final String name;
    
    protected AbstractDatasetDefinition(String name);
    
    @Override
    public final String getName();
    
    @Override
    public DatasetSpecification configure(String instanceName, DatasetProperties properties);
    
    @Override
    public DatasetSpecification reconfigure(String instanceName, DatasetProperties newProperties,
                                          DatasetSpecification currentSpec) throws IncompatibleUpdateException;
}

Base implementation for dataset definitions providing common functionality.

Dataset Administration

DatasetAdmin

public interface DatasetAdmin {
    boolean exists() throws IOException;
    void create() throws IOException;
    void drop() throws IOException;
    void truncate() throws IOException;
    void upgrade() throws IOException;
}

Administrative operations for dataset lifecycle management.

DatasetManager

public interface DatasetManager {
    <T extends Dataset> T getDataset(String name) throws DatasetInstantiationException;
    <T extends Dataset> T getDataset(String name, Map<String, String> arguments) 
        throws DatasetInstantiationException;
    void releaseDataset(Dataset dataset);
}

Manager for dataset instance creation and lifecycle.

Dataset Configuration

DatasetProperties

public class DatasetProperties {
    public static Builder builder();
    
    public Map<String, String> getProperties();
    public String get(String key);
    public String get(String key, String defaultValue);
    
    public static class Builder {
        public Builder add(String key, String value);
        public Builder addAll(Map<String, String> properties);
        public DatasetProperties build();
    }
}

Configuration properties for dataset instances.

DatasetSpecification

public class DatasetSpecification {
    public String getName();
    public String getType();
    public Map<String, String> getProperties();
    public Map<String, DatasetSpecification> getSpecifications();
    
    public static Builder builder(String name, String type);
    
    public static class Builder {
        public Builder properties(Map<String, String> properties);
        public Builder property(String key, String value);
        public Builder datasets(DatasetSpecification... specifications);
        public DatasetSpecification build();
    }
}

Complete specification of a dataset including type, properties, and nested datasets.

Built-in Dataset Types

Key-Value Storage

public class KeyValueTable extends AbstractDataset {
    public void write(String key, String value);
    public void write(byte[] key, byte[] value);
    public void write(String key, byte[] value);
    
    public String read(String key);
    public byte[] read(byte[] key);
    
    public void delete(String key);
    public void delete(byte[] key);
    
    public CloseableIterator<KeyValue<byte[], byte[]>> scan(byte[] startKey, byte[] stopKey);
}

Simple key-value storage supporting string and byte array keys/values.

Table Storage

public interface Table extends Dataset {
    byte[] read(byte[] row, byte[] column);
    byte[] read(byte[] row, String column);
    Row get(byte[] row);
    Row get(byte[] row, byte[][] columns);
    Row get(byte[] row, String[] columns);
    Row get(Get get);
    
    void put(byte[] row, byte[] column, byte[] value);
    void put(byte[] row, String column, byte[] value);
    void put(byte[] row, String column, String value);
    void put(Put put);
    
    void delete(byte[] row);
    void delete(byte[] row, byte[] column);
    void delete(byte[] row, String column);
    void delete(Delete delete);
    
    Scanner scan(byte[] startRow, byte[] stopRow);
    Scanner scan(Scan scan);
    
    void increment(byte[] row, byte[] column, long amount);
    void increment(Increment increment);
}

HBase-style table with row/column storage and atomic operations.

Object Storage

public class ObjectStore<T> extends AbstractDataset {
    public void write(byte[] key, T object);
    public void write(String key, T object);
    
    public T read(byte[] key);
    public T read(String key);
    
    public void delete(byte[] key);
    public void delete(String key);
    
    public CloseableIterator<KeyValue<byte[], T>> scan(byte[] startKey, byte[] stopKey);
}

Type-safe object storage with automatic serialization/deserialization.

File Storage

public interface FileSet extends Dataset {
    Location getLocation(String relativePath);
    Location getBaseLocation();
    
    Iterable<Location> getInputLocations();
    Location getOutputLocation();
    
    Map<String, String> getInputArguments();
    Map<String, String> getOutputArguments();
}

File-based dataset for storing and accessing files in distributed storage.

Partitioned File Storage

public interface PartitionedFileSet extends Dataset {
    PartitionOutput getPartitionOutput(PartitionKey key);
    PartitionOutput getPartitionOutput(PartitionKey key, DatasetArguments arguments);
    
    Partition getPartition(PartitionKey key);
    Set<Partition> getPartitions(PartitionFilter filter);
    
    void addPartition(PartitionKey key, String path);
    void addPartition(PartitionKey key, String path, Map<String, String> metadata);
    
    void dropPartition(PartitionKey key);
    
    PartitionConsumer getPartitionConsumer();
}

Partitioned file storage supporting efficient querying and processing of large datasets organized by partition keys.

Custom Dataset Implementation

Abstract Base Classes

public abstract class AbstractDataset implements Dataset {
    protected final DatasetSpecification spec;
    protected final Map<String, String> arguments;
    
    protected AbstractDataset(DatasetSpecification spec, Map<String, String> arguments);
    
    public final DatasetSpecification getSpec();
    public final Map<String, String> getArguments();
    
    @Override
    public void close() {
        // Default implementation - override if needed
    }
}

Base class for custom dataset implementations.

Dataset Context

DatasetContext

public interface DatasetContext {
    <T extends Dataset> T getDataset(String name) throws DatasetInstantiationException;
    <T extends Dataset> T getDataset(String name, Map<String, String> arguments) 
        throws DatasetInstantiationException;
    void releaseDataset(Dataset dataset);
}

Context interface providing dataset access within programs and services.

Usage Examples

Basic Dataset Operations

public class DatasetExample extends AbstractMapReduce {
    
    @Override
    public void configure(MapReduceConfigurer configurer) {
        configurer.useDataset("userProfiles");
        configurer.useDataset("userScores");
    }
    
    @Override
    public void initialize(MapReduceContext context) throws Exception {
        // Access datasets in the context
        KeyValueTable profiles = context.getDataset("userProfiles");
        ObjectStore<UserScore> scores = context.getDataset("userScores");
        
        // Read data
        String profile = profiles.read("user123");
        UserScore score = scores.read("user123");
        
        // Write data
        profiles.write("user456", "profile data");
        scores.write("user456", new UserScore(100, "Gold"));
    }
}

Dataset Creation in Application

public class DataApplication extends AbstractApplication<Config> {
    
    @Override
    public void configure() {
        // Create simple key-value dataset
        createDataset("userCache", KeyValueTable.class);
        
        // Create table with properties
        createDataset("userTable", Table.class, 
                     DatasetProperties.builder()
                         .add("table.rowkey.ttl", "3600")
                         .build());
        
        // Create partitioned file set
        createDataset("logs", PartitionedFileSet.class,
                     DatasetProperties.builder()
                         .add("schema", logSchema)
                         .add("partitioning", "year/month/day")
                         .build());
        
        addMapReduce(new DataProcessor());
    }
}

Custom Dataset Implementation

public class CounterDataset extends AbstractDataset {
    private final Table table;
    
    public CounterDataset(DatasetSpecification spec, Map<String, String> arguments, Table table) {
        super(spec, arguments);
        this.table = table;
    }
    
    public void increment(String counter, long delta) {
        table.increment(counter.getBytes(), "count".getBytes(), delta);
    }
    
    public long get(String counter) {
        byte[] value = table.read(counter.getBytes(), "count".getBytes());
        return value == null ? 0 : Bytes.toLong(value);
    }
    
    @Override
    public void close() {
        table.close();
    }
}

public class CounterDatasetDefinition extends AbstractDatasetDefinition<CounterDataset, DatasetAdmin> {
    
    public CounterDatasetDefinition(String name) {
        super(name);
    }
    
    @Override
    public CounterDataset getDataset(DatasetContext datasetContext, DatasetSpecification spec,
                                   Map<String, String> arguments, ClassLoader classLoader) throws IOException {
        Table table = datasetContext.getDataset("table");
        return new CounterDataset(spec, arguments, table);
    }
    
    @Override
    public DatasetAdmin getAdmin(DatasetContext datasetContext, DatasetSpecification spec,
                               ClassLoader classLoader) throws IOException {
        return datasetContext.getDataset("table").getAdmin();
    }
}

Transaction Support

public class TransactionalDatasetExample extends AbstractService {
    
    @UseDataSet("userAccounts")
    private Table accounts;
    
    @UseDataSet("transactions")
    private ObjectStore<Transaction> transactions;
    
    public void transferFunds(String fromAccount, String toAccount, double amount) {
        Transactionals.execute(this, new TxRunnable() {
            @Override
            public void run(DatasetContext context) throws Exception {
                Table accounts = context.getDataset("userAccounts");
                ObjectStore<Transaction> transactions = context.getDataset("transactions");
                
                // Read current balances
                double fromBalance = getBalance(accounts, fromAccount);
                double toBalance = getBalance(accounts, toAccount);
                
                // Validate and perform transfer
                if (fromBalance >= amount) {
                    setBalance(accounts, fromAccount, fromBalance - amount);
                    setBalance(accounts, toAccount, toBalance + amount);
                    
                    // Log transaction
                    Transaction tx = new Transaction(fromAccount, toAccount, amount, System.currentTimeMillis());
                    transactions.write(UUID.randomUUID().toString(), tx);
                } else {
                    throw new InsufficientFundsException();
                }
            }
        });
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-co-cask-cdap--cdap-api

docs

annotations.md

application-framework.md

dataset-management.md

index.md

mapreduce-programs.md

plugin-framework.md

scheduling.md

service-programs.md

spark-programs.md

system-services.md

transactions.md

worker-programs.md

workflow-programs.md

tile.json