Core application programming interface for the Cask Data Application Platform enabling development of scalable data processing applications on Hadoop ecosystems.
—
CDAP's Dataset Management system provides a comprehensive abstraction layer for data storage and access, supporting both built-in dataset types and custom implementations with ACID transaction support.
public interface Dataset extends Closeable {
@Override
void close();
}Base interface for all datasets. All dataset implementations must extend this interface and provide proper resource cleanup.
public interface DatasetDefinition<D extends Dataset, A extends DatasetAdmin> {
String getName();
D getDataset(DatasetContext datasetContext, DatasetSpecification spec,
Map<String, String> arguments, ClassLoader classLoader) throws IOException;
A getAdmin(DatasetContext datasetContext, DatasetSpecification spec,
ClassLoader classLoader) throws IOException;
DatasetSpecification configure(String instanceName, DatasetProperties properties);
DatasetSpecification reconfigure(String instanceName, DatasetProperties newProperties,
DatasetSpecification currentSpec) throws IncompatibleUpdateException;
}Defines how dataset instances are created, configured, and managed. Custom dataset types implement this interface.
public abstract class AbstractDatasetDefinition<D extends Dataset, A extends DatasetAdmin>
implements DatasetDefinition<D, A> {
protected final String name;
protected AbstractDatasetDefinition(String name);
@Override
public final String getName();
@Override
public DatasetSpecification configure(String instanceName, DatasetProperties properties);
@Override
public DatasetSpecification reconfigure(String instanceName, DatasetProperties newProperties,
DatasetSpecification currentSpec) throws IncompatibleUpdateException;
}Base implementation for dataset definitions providing common functionality.
public interface DatasetAdmin {
boolean exists() throws IOException;
void create() throws IOException;
void drop() throws IOException;
void truncate() throws IOException;
void upgrade() throws IOException;
}Administrative operations for dataset lifecycle management.
public interface DatasetManager {
<T extends Dataset> T getDataset(String name) throws DatasetInstantiationException;
<T extends Dataset> T getDataset(String name, Map<String, String> arguments)
throws DatasetInstantiationException;
void releaseDataset(Dataset dataset);
}Manager for dataset instance creation and lifecycle.
public class DatasetProperties {
public static Builder builder();
public Map<String, String> getProperties();
public String get(String key);
public String get(String key, String defaultValue);
public static class Builder {
public Builder add(String key, String value);
public Builder addAll(Map<String, String> properties);
public DatasetProperties build();
}
}Configuration properties for dataset instances.
public class DatasetSpecification {
public String getName();
public String getType();
public Map<String, String> getProperties();
public Map<String, DatasetSpecification> getSpecifications();
public static Builder builder(String name, String type);
public static class Builder {
public Builder properties(Map<String, String> properties);
public Builder property(String key, String value);
public Builder datasets(DatasetSpecification... specifications);
public DatasetSpecification build();
}
}Complete specification of a dataset including type, properties, and nested datasets.
public class KeyValueTable extends AbstractDataset {
public void write(String key, String value);
public void write(byte[] key, byte[] value);
public void write(String key, byte[] value);
public String read(String key);
public byte[] read(byte[] key);
public void delete(String key);
public void delete(byte[] key);
public CloseableIterator<KeyValue<byte[], byte[]>> scan(byte[] startKey, byte[] stopKey);
}Simple key-value storage supporting string and byte array keys/values.
public interface Table extends Dataset {
byte[] read(byte[] row, byte[] column);
byte[] read(byte[] row, String column);
Row get(byte[] row);
Row get(byte[] row, byte[][] columns);
Row get(byte[] row, String[] columns);
Row get(Get get);
void put(byte[] row, byte[] column, byte[] value);
void put(byte[] row, String column, byte[] value);
void put(byte[] row, String column, String value);
void put(Put put);
void delete(byte[] row);
void delete(byte[] row, byte[] column);
void delete(byte[] row, String column);
void delete(Delete delete);
Scanner scan(byte[] startRow, byte[] stopRow);
Scanner scan(Scan scan);
void increment(byte[] row, byte[] column, long amount);
void increment(Increment increment);
}HBase-style table with row/column storage and atomic operations.
public class ObjectStore<T> extends AbstractDataset {
public void write(byte[] key, T object);
public void write(String key, T object);
public T read(byte[] key);
public T read(String key);
public void delete(byte[] key);
public void delete(String key);
public CloseableIterator<KeyValue<byte[], T>> scan(byte[] startKey, byte[] stopKey);
}Type-safe object storage with automatic serialization/deserialization.
public interface FileSet extends Dataset {
Location getLocation(String relativePath);
Location getBaseLocation();
Iterable<Location> getInputLocations();
Location getOutputLocation();
Map<String, String> getInputArguments();
Map<String, String> getOutputArguments();
}File-based dataset for storing and accessing files in distributed storage.
public interface PartitionedFileSet extends Dataset {
PartitionOutput getPartitionOutput(PartitionKey key);
PartitionOutput getPartitionOutput(PartitionKey key, DatasetArguments arguments);
Partition getPartition(PartitionKey key);
Set<Partition> getPartitions(PartitionFilter filter);
void addPartition(PartitionKey key, String path);
void addPartition(PartitionKey key, String path, Map<String, String> metadata);
void dropPartition(PartitionKey key);
PartitionConsumer getPartitionConsumer();
}Partitioned file storage supporting efficient querying and processing of large datasets organized by partition keys.
public abstract class AbstractDataset implements Dataset {
protected final DatasetSpecification spec;
protected final Map<String, String> arguments;
protected AbstractDataset(DatasetSpecification spec, Map<String, String> arguments);
public final DatasetSpecification getSpec();
public final Map<String, String> getArguments();
@Override
public void close() {
// Default implementation - override if needed
}
}Base class for custom dataset implementations.
public interface DatasetContext {
<T extends Dataset> T getDataset(String name) throws DatasetInstantiationException;
<T extends Dataset> T getDataset(String name, Map<String, String> arguments)
throws DatasetInstantiationException;
void releaseDataset(Dataset dataset);
}Context interface providing dataset access within programs and services.
public class DatasetExample extends AbstractMapReduce {
@Override
public void configure(MapReduceConfigurer configurer) {
configurer.useDataset("userProfiles");
configurer.useDataset("userScores");
}
@Override
public void initialize(MapReduceContext context) throws Exception {
// Access datasets in the context
KeyValueTable profiles = context.getDataset("userProfiles");
ObjectStore<UserScore> scores = context.getDataset("userScores");
// Read data
String profile = profiles.read("user123");
UserScore score = scores.read("user123");
// Write data
profiles.write("user456", "profile data");
scores.write("user456", new UserScore(100, "Gold"));
}
}public class DataApplication extends AbstractApplication<Config> {
@Override
public void configure() {
// Create simple key-value dataset
createDataset("userCache", KeyValueTable.class);
// Create table with properties
createDataset("userTable", Table.class,
DatasetProperties.builder()
.add("table.rowkey.ttl", "3600")
.build());
// Create partitioned file set
createDataset("logs", PartitionedFileSet.class,
DatasetProperties.builder()
.add("schema", logSchema)
.add("partitioning", "year/month/day")
.build());
addMapReduce(new DataProcessor());
}
}public class CounterDataset extends AbstractDataset {
private final Table table;
public CounterDataset(DatasetSpecification spec, Map<String, String> arguments, Table table) {
super(spec, arguments);
this.table = table;
}
public void increment(String counter, long delta) {
table.increment(counter.getBytes(), "count".getBytes(), delta);
}
public long get(String counter) {
byte[] value = table.read(counter.getBytes(), "count".getBytes());
return value == null ? 0 : Bytes.toLong(value);
}
@Override
public void close() {
table.close();
}
}
public class CounterDatasetDefinition extends AbstractDatasetDefinition<CounterDataset, DatasetAdmin> {
public CounterDatasetDefinition(String name) {
super(name);
}
@Override
public CounterDataset getDataset(DatasetContext datasetContext, DatasetSpecification spec,
Map<String, String> arguments, ClassLoader classLoader) throws IOException {
Table table = datasetContext.getDataset("table");
return new CounterDataset(spec, arguments, table);
}
@Override
public DatasetAdmin getAdmin(DatasetContext datasetContext, DatasetSpecification spec,
ClassLoader classLoader) throws IOException {
return datasetContext.getDataset("table").getAdmin();
}
}public class TransactionalDatasetExample extends AbstractService {
@UseDataSet("userAccounts")
private Table accounts;
@UseDataSet("transactions")
private ObjectStore<Transaction> transactions;
public void transferFunds(String fromAccount, String toAccount, double amount) {
Transactionals.execute(this, new TxRunnable() {
@Override
public void run(DatasetContext context) throws Exception {
Table accounts = context.getDataset("userAccounts");
ObjectStore<Transaction> transactions = context.getDataset("transactions");
// Read current balances
double fromBalance = getBalance(accounts, fromAccount);
double toBalance = getBalance(accounts, toAccount);
// Validate and perform transfer
if (fromBalance >= amount) {
setBalance(accounts, fromAccount, fromBalance - amount);
setBalance(accounts, toAccount, toBalance + amount);
// Log transaction
Transaction tx = new Transaction(fromAccount, toAccount, amount, System.currentTimeMillis());
transactions.write(UUID.randomUUID().toString(), tx);
} else {
throw new InsufficientFundsException();
}
}
});
}
}Install with Tessl CLI
npx tessl i tessl/maven-co-cask-cdap--cdap-api