Core data management capabilities for CDAP including dataset operations, metadata management, lineage tracking, audit functionality, and data registry services for Hadoop-based applications.
—
Comprehensive dataset lifecycle management including creation, configuration, access, and administration across multiple storage backends with transaction support and lineage tracking. The Dataset Framework provides a unified API for managing datasets regardless of underlying storage technology.
The primary interface for all dataset management operations, providing comprehensive lifecycle management across different storage backends.
public interface DatasetFramework {
// Module Management
void addModule(DatasetModuleId moduleId, DatasetModule module) throws DatasetManagementException;
void deleteModule(DatasetModuleId moduleId) throws DatasetManagementException;
void deleteAllModules(NamespaceId namespaceId) throws DatasetManagementException;
// Instance Management
void addInstance(String datasetTypeName, DatasetId datasetInstanceId, DatasetProperties props,
KerberosPrincipalId ownerPrincipal) throws DatasetManagementException, IOException;
void updateInstance(DatasetId datasetInstanceId, DatasetProperties props)
throws DatasetManagementException, IOException;
void deleteInstance(DatasetId datasetInstanceId) throws DatasetManagementException, IOException;
void deleteAllInstances(NamespaceId namespaceId) throws DatasetManagementException, IOException;
// Instance Queries
Collection<DatasetSpecificationSummary> getInstances(NamespaceId namespaceId)
throws DatasetManagementException;
DatasetSpecification getDatasetSpec(DatasetId datasetInstanceId) throws DatasetManagementException;
boolean hasInstance(DatasetId datasetInstanceId) throws DatasetManagementException;
boolean hasType(DatasetTypeId datasetTypeId) throws DatasetManagementException;
DatasetTypeMeta getTypeInfo(DatasetTypeId datasetTypeId) throws DatasetManagementException;
// Dataset Access
<T extends Dataset> T getDataset(DatasetId datasetInstanceId, Map<String, String> arguments,
ClassLoader classLoader, DatasetClassLoaderProvider classLoaderProvider,
Iterable<? extends EntityId> owners, AccessType accessType)
throws DatasetManagementException, IOException;
<T extends DatasetAdmin> T getAdmin(DatasetId datasetInstanceId, ClassLoader classLoader)
throws DatasetManagementException, IOException;
// Operations
void truncateInstance(DatasetId datasetInstanceId) throws DatasetManagementException, IOException;
void writeLineage(DatasetId datasetInstanceId, AccessType accessType);
}Different implementations of the DatasetFramework for various deployment scenarios and requirements.
// In-memory implementation for testing
public class InMemoryDatasetFramework implements DatasetFramework {
// Fast in-memory dataset operations for unit testing
}
// Delegation wrapper for cross-cutting concerns
public class ForwardingDatasetFramework implements DatasetFramework {
protected DatasetFramework delegate();
// Forwarding implementation allowing decoration of dataset operations
}
// Static configuration-based framework
public class StaticDatasetFramework implements DatasetFramework {
// Pre-configured dataset framework for static environments
}Dataset instance caching for performance optimization with both single-threaded and multi-threaded implementations.
// Single-threaded dataset caching
public class SingleThreadDatasetCache implements Closeable {
public <T extends Dataset> T getDataset(DatasetId datasetId, Map<String, String> arguments,
ClassLoader classLoader, Iterable<? extends EntityId> owners,
AccessType accessType) throws IOException, DatasetInstantiationException;
public void invalidate();
}
// Multi-threaded dataset caching with concurrent access
public class MultiThreadDatasetCache implements Closeable {
public <T extends Dataset> T getDataset(DatasetId datasetId, Map<String, String> arguments,
ClassLoader classLoader, Iterable<? extends EntityId> owners,
AccessType accessType, @Nullable ProgramContext programContext)
throws IOException, DatasetInstantiationException;
public void invalidateAll();
}Dataset modules provide different storage backend implementations with consistent APIs.
// HBase-based dataset modules
public class HBaseTableModule implements DatasetModule {
// HBase table dataset implementation
}
public class HBaseMetricsTableModule implements DatasetModule {
// Specialized HBase metrics table implementation
}
// LevelDB-based dataset modules
public class LevelDBTableModule implements DatasetModule {
// LevelDB table dataset implementation for local storage
}
// In-memory dataset modules
public class InMemoryTableModule implements DatasetModule {
// In-memory table implementation for testing and caching
}
// File-based dataset modules
public class FileSetModule implements DatasetModule {
// File-based dataset operations
}
public class PartitionedFileSetModule implements DatasetModule {
// Partitioned file dataset for large-scale data processing
}
public class TimePartitionedFileSetModule implements DatasetModule {
// Time-based partitioned datasets for time-series data
}Core dataset implementations providing specific data access patterns.
// File-based datasets
public class FileSetDefinition implements DatasetDefinition<FileSet, FileSetAdmin>, Reconfigurable {
public FileSet getDataset(DatasetContext datasetContext, DatasetSpecification spec,
Map<String, String> arguments, ClassLoader classLoader) throws IOException;
}
public class FileSetAdmin implements DatasetAdmin, Updatable {
public void create() throws IOException;
public void drop() throws IOException;
public void truncate() throws IOException;
public void upgrade() throws IOException;
}
public class PartitionedFileSetDataset extends AbstractDataset implements PartitionedFileSet {
public void addPartition(PartitionKey key, String path);
public void addPartition(PartitionKey key, String path, Map<String, String> metadata);
public PartitionDetail getPartition(PartitionKey key);
public Set<PartitionDetail> getPartitions(PartitionFilter filter);
public void dropPartition(PartitionKey key);
}
public class TimePartitionedFileSetDataset extends PartitionedFileSetDataset
implements TimePartitionedFileSet {
public void addPartition(long time, String path);
public void addPartition(long time, String path, Map<String, String> metadata);
public PartitionDetail getPartitionByTime(long time);
public Set<PartitionDetail> getPartitionsByTime(long startTime, long endTime);
}
// Key-value tables
public class NoTxKeyValueTable extends AbstractDataset implements KeyValueTable {
public void write(byte[] key, byte[] value);
public byte[] read(byte[] key);
public void delete(byte[] key);
public CloseableIterator<KeyValue<byte[], byte[]>> scan(byte[] startRow, byte[] stopRow);
}
// Multi-dimensional data cubes
public class CubeDataset extends AbstractDataset implements Cube {
public void add(CubeFact fact);
public void add(Collection<? extends CubeFact> facts);
public CubeExploreQuery query(CubeExploreQuery query);
public Collection<DimensionValue> findDimensionValues(DimensionValue dimensionValue);
public Collection<String> findMeasureNames();
}
public class CubeDatasetDefinition implements DatasetDefinition<CubeDataset, CubeDatasetAdmin> {
public CubeDataset getDataset(DatasetContext datasetContext, DatasetSpecification spec,
Map<String, String> arguments, ClassLoader classLoader) throws IOException;
}
// Object storage
public class ObjectStoreDataset<T> extends AbstractDataset implements ObjectStore<T> {
public void write(byte[] key, T object) throws IOException;
public T read(byte[] key) throws IOException;
public void delete(byte[] key);
public CloseableIterator<KeyValue<byte[], T>> scan(byte[] startKey, byte[] stopKey) throws IOException;
}
public class ObjectMappedTableDataset<T> extends AbstractDataset implements ObjectMappedTable<T> {
public void write(String key, T object) throws IOException;
public T read(String key) throws IOException;
public void delete(String key);
public CloseableIterator<KeyValue<String, T>> scan(String startKey, String stopKey) throws IOException;
}
// Metrics tables
public interface MetricsTable {
void put(SortedMap<byte[], SortedMap<byte[], Long>> updates);
boolean swap(byte[] row, byte[] column, byte[] oldValue, byte[] newValue);
void increment(byte[] row, Map<byte[], Long> increments);
void increment(SortedMap<byte[], SortedMap<byte[], Long>> updates);
Scanner scan(byte[] startRow, byte[] stopRow);
}// Create dataset framework instance (typically injected)
DatasetFramework datasetFramework = // ... obtain from dependency injection
// Define dataset properties
DatasetProperties properties = DatasetProperties.builder()
.add("hbase.splits", "10")
.add("hbase.compression", "SNAPPY")
.build();
// Create dataset instance
DatasetId datasetId = NamespaceId.DEFAULT.dataset("userProfiles");
datasetFramework.addInstance("keyValueTable", datasetId, properties, null);
// Access dataset for operations
KeyValueTable dataset = datasetFramework.getDataset(
datasetId,
Collections.emptyMap(), // runtime arguments
null, // classloader
null, // classloader provider
Collections.emptyList(), // owners
AccessType.READ_WRITE
);
// Use the dataset
try {
dataset.write(Bytes.toBytes("user123"), Bytes.toBytes("profile_data"));
byte[] data = dataset.read(Bytes.toBytes("user123"));
} finally {
dataset.close();
}// Access partitioned file set
PartitionedFileSetDataset partitionedDataset = datasetFramework.getDataset(
datasetId, null, null, null, null, AccessType.READ_WRITE);
// Add partitions with metadata
PartitionKey key = PartitionKey.builder()
.addStringField("year", "2023")
.addStringField("month", "01")
.addStringField("day", "15")
.build();
Map<String, String> metadata = Map.of(
"format", "parquet",
"compression", "snappy",
"records", "1000000"
);
partitionedDataset.addPartition(key, "/data/2023/01/15", metadata);
// Query partitions
PartitionFilter filter = PartitionFilter.builder()
.addRangeCondition("year", "2023", "2023")
.addRangeCondition("month", "01", "03")
.build();
Set<PartitionDetail> partitions = partitionedDataset.getPartitions(filter);
for (PartitionDetail partition : partitions) {
System.out.println("Partition: " + partition.getRelativePath());
System.out.println("Metadata: " + partition.getMetadata());
}// Add custom dataset module
DatasetModuleId moduleId = NamespaceId.DEFAULT.datasetModule("customModule");
DatasetModule customModule = new CustomDatasetModule();
datasetFramework.addModule(moduleId, customModule);
// Create instance of custom dataset type
datasetFramework.addInstance("customType", datasetId, properties, null);
// Check if dataset type exists
DatasetTypeId typeId = NamespaceId.DEFAULT.datasetType("customType");
boolean exists = datasetFramework.hasType(typeId);
// Get type information
DatasetTypeMeta typeMeta = datasetFramework.getTypeInfo(typeId);
System.out.println("Type: " + typeMeta.getName());
System.out.println("Modules: " + typeMeta.getModules());// Dataset identifiers and specifications
public final class DatasetId extends EntityId {
public static DatasetId of(String namespace, String dataset);
public String getDataset();
public NamespaceId getParent();
}
public final class DatasetModuleId extends EntityId {
public static DatasetModuleId of(String namespace, String module);
public String getModule();
}
public final class DatasetTypeId extends EntityId {
public static DatasetTypeId of(String namespace, String type);
public String getType();
}
public interface DatasetSpecification {
String getName();
String getType();
DatasetProperties getProperties();
Map<String, DatasetSpecification> getSpecifications();
}
public final class DatasetSpecificationSummary {
public String getName();
public String getType();
public String getDescription();
}
// Dataset properties and configuration
public final class DatasetProperties {
public static Builder builder();
public Map<String, String> getProperties();
public static class Builder {
public Builder add(String key, String value);
public Builder addAll(Map<String, String> properties);
public DatasetProperties build();
}
}
// Dataset type metadata
public final class DatasetTypeMeta {
public String getName();
public List<DatasetModuleMeta> getModules();
}
public final class DatasetModuleMeta {
public String getName();
public String getClassName();
public String getJarLocation();
public List<String> getTypes();
}
// Dataset interfaces
public interface Dataset extends Closeable {
// Base dataset interface
}
public interface DatasetAdmin {
void create() throws IOException;
void drop() throws IOException;
void truncate() throws IOException;
void upgrade() throws IOException;
void close() throws IOException;
}
// Access and security types
public enum AccessType {
READ, WRITE, ADMIN, READ_WRITE, UNKNOWN
}
public final class KerberosPrincipalId {
public static KerberosPrincipalId of(String principal);
public String getPrincipal();
}
// Exception types
public class DatasetManagementException extends Exception {
public DatasetManagementException(String message);
public DatasetManagementException(String message, Throwable cause);
}
public class DatasetInstantiationException extends Exception {
public DatasetInstantiationException(String message);
public DatasetInstantiationException(String message, Throwable cause);
}Install with Tessl CLI
npx tessl i tessl/maven-co-cask-cdap--cdap-data-fabric