Core data management capabilities for CDAP including dataset operations, metadata management, lineage tracking, audit functionality, and data registry services for Hadoop-based applications.
—
Complete metadata management system for properties, tags, search, and indexing with support for custom indexing strategies and historical snapshots. The MetadataDataset provides comprehensive metadata operations for any entity in the CDAP system with efficient search and retrieval capabilities.
The primary interface for all metadata management operations, providing property and tag management with complete CRUD operations.
public class MetadataDataset extends AbstractDataset {
// Property Management
public MetadataChange setProperty(MetadataEntity metadataEntity, String key, String value);
public MetadataChange setProperty(MetadataEntity metadataEntity, Map<String, String> properties);
public Map<String, String> getProperties(MetadataEntity metadataEntity);
public String getProperty(MetadataEntity metadataEntity, String key);
public MetadataChange removeProperties(MetadataEntity metadataEntity, Set<String> keys);
public MetadataChange removeProperties(MetadataEntity metadataEntity);
// Tag Management
public MetadataChange addTags(MetadataEntity metadataEntity, Set<String> tagsToAdd);
public Set<String> getTags(MetadataEntity metadataEntity);
public MetadataChange removeTags(MetadataEntity metadataEntity, Set<String> tagsToRemove);
public MetadataChange removeTags(MetadataEntity metadataEntity);
// Complete Metadata Retrieval
public Metadata getMetadata(MetadataEntity metadataEntity);
public Set<Metadata> getMetadata(Set<MetadataEntity> metadataEntities);
// Search Operations
public SearchResults search(SearchRequest request) throws BadRequestException;
// Historical Operations
public Set<Metadata> getSnapshotBeforeTime(Set<MetadataEntity> metadataEntitys, long timeMillis);
// Index Management
public MetadataChange rebuildIndexes(byte[] startRowKey, int limit);
public void deleteAllIndexes(int limit);
}High-level metadata operations interface providing scope-based metadata management for both system and user metadata with comprehensive CRUD operations.
public interface MetadataStore {
// Property Management
void setProperties(MetadataScope scope, MetadataEntity metadataEntity, Map<String, String> properties);
void setProperty(MetadataScope scope, MetadataEntity metadataEntity, String key, String value);
Map<String, String> getProperties(MetadataEntity metadataEntity);
Map<String, String> getProperties(MetadataScope scope, MetadataEntity metadataEntity);
void removeProperties(MetadataScope scope, MetadataEntity metadataEntity);
void removeProperties(MetadataScope scope, MetadataEntity metadataEntity, Set<String> keys);
// Tag Management
void addTags(MetadataScope scope, MetadataEntity metadataEntity, Set<String> tagsToAdd);
Set<String> getTags(MetadataEntity metadataEntity);
Set<String> getTags(MetadataScope scope, MetadataEntity metadataEntity);
void removeTags(MetadataScope scope, MetadataEntity metadataEntity);
void removeTags(MetadataScope scope, MetadataEntity metadataEntity, Set<String> tagsToRemove);
// Complete Metadata Operations
Set<MetadataRecordV2> getMetadata(MetadataEntity metadataEntity);
MetadataRecordV2 getMetadata(MetadataScope scope, MetadataEntity metadataEntity);
Set<MetadataRecordV2> getMetadata(MetadataScope scope, Set<MetadataEntity> metadataEntitys);
void removeMetadata(MetadataEntity metadataEntity);
void removeMetadata(MetadataScope scope, MetadataEntity metadataEntity);
// Search Operations
MetadataSearchResponseV2 search(SearchRequest request);
// Historical Operations
Set<MetadataRecordV2> getSnapshotBeforeTime(MetadataScope scope, Set<MetadataEntity> metadataEntitys, long timeMillis);
// Administrative Operations
void rebuildIndexes(MetadataScope scope, RetryStrategy retryStrategy);
void createOrUpgrade(MetadataScope scope) throws DatasetManagementException, IOException;
}Pluggable indexing strategies for efficient metadata search and retrieval across different data access patterns.
// Base indexing interface
public interface Indexer {
Set<String> getIndexes(MetadataEntry entry);
}
// Standard indexing implementations
public class DefaultValueIndexer implements Indexer {
// Standard value-based indexing for exact matches
}
public class SchemaIndexer implements Indexer {
// Schema-aware indexing for structured metadata
}
public class InvertedValueIndexer implements Indexer {
// Reverse indexing for efficient range queries and sorting
}
public class InvertedTimeIndexer implements Indexer {
// Time-based reverse indexing for temporal queries
}
public class ValueOnlyIndexer implements Indexer {
// Value-only indexing without key information
}
public class MetadataEntityTypeIndexer implements Indexer {
// Entity type-based indexing for type-specific queries
}Comprehensive search capabilities with flexible query parameters, sorting, and pagination support.
// Search request configuration
public class SearchRequest {
public static Builder builder();
public String getQuery();
public Set<MetadataScope> getScopes();
public Set<EntityTypeSimpleName> getTypes();
public SortInfo getSortInfo();
public int getOffset();
public int getLimit();
public boolean shouldShowHidden();
public Set<String> getCursorRequiredFields();
public static class Builder {
public Builder setQuery(String query);
public Builder setScopes(Set<MetadataScope> scopes);
public Builder setTypes(Set<EntityTypeSimpleName> types);
public Builder setSortInfo(SortInfo sortInfo);
public Builder setOffset(int offset);
public Builder setLimit(int limit);
public Builder setShowHidden(boolean showHidden);
public Builder setCursorRequiredFields(Set<String> fields);
public SearchRequest build();
}
}
// Search results with pagination
public class SearchResults {
public List<MetadataSearchResultRecord> getResults();
public String getCursor();
public int getTotal();
public boolean hasMore();
}
// Search result record structure
public class MetadataSearchResultRecord {
public MetadataEntity getMetadataEntity();
public Metadata getMetadata();
}
// Sorting configuration
public class SortInfo {
public String getSortBy();
public SortOrder getSortOrder();
public enum SortOrder {
ASC, DESC
}
}// Access metadata dataset (typically injected)
MetadataDataset metadataDataset = // ... obtain instance
// Define entity to operate on
MetadataEntity entity = MetadataEntity.ofDataset(NamespaceId.DEFAULT, "userProfiles");
// Set properties
Map<String, String> properties = Map.of(
"environment", "production",
"owner", "team-alpha",
"created", "2023-01-15",
"format", "parquet",
"compression", "snappy"
);
MetadataChange change = metadataDataset.setProperty(entity, properties);
System.out.println("Properties added: " + change.getAfter().getProperties());
// Add tags
Set<String> tags = Set.of("production", "critical", "team-alpha", "analytics");
metadataDataset.addTags(entity, tags);
// Retrieve complete metadata
Metadata metadata = metadataDataset.getMetadata(entity);
System.out.println("Properties: " + metadata.getProperties());
System.out.println("Tags: " + metadata.getTags());
// Update specific property
metadataDataset.setProperty(entity, "last_updated", "2023-06-20");
// Remove specific tags
metadataDataset.removeTags(entity, Set.of("analytics"));// Search by property value
SearchRequest request = SearchRequest.builder()
.setQuery("properties:environment:production")
.setTypes(Set.of(EntityTypeSimpleName.DATASET))
.setLimit(50)
.build();
SearchResults results = metadataDataset.search(request);
for (MetadataSearchResultRecord record : results.getResults()) {
System.out.println("Dataset: " + record.getMetadataEntity());
System.out.println("Environment: " + record.getMetadata().getProperties().get("environment"));
}
// Search by tags with sorting
SearchRequest tagSearch = SearchRequest.builder()
.setQuery("tags:critical")
.setSortInfo(new SortInfo("entity_name", SortInfo.SortOrder.ASC))
.setOffset(0)
.setLimit(100)
.build();
SearchResults tagResults = metadataDataset.search(tagSearch);
// Complex query combining properties and tags
SearchRequest complexSearch = SearchRequest.builder()
.setQuery("(properties:owner:team-alpha) AND (tags:production)")
.setTypes(Set.of(EntityTypeSimpleName.DATASET, EntityTypeSimpleName.APPLICATION))
.build();
SearchResults complexResults = metadataDataset.search(complexSearch);
// Paginated search with cursor
String cursor = null;
do {
SearchRequest paginatedRequest = SearchRequest.builder()
.setQuery("tags:analytics")
.setLimit(20)
.setCursorRequiredFields(Set.of("entity_name"))
.build();
SearchResults page = metadataDataset.search(paginatedRequest);
processResults(page.getResults());
cursor = page.getCursor();
} while (cursor != null);// Get historical snapshot of metadata
Set<MetadataEntity> entities = Set.of(
MetadataEntity.ofDataset(NamespaceId.DEFAULT, "dataset1"),
MetadataEntity.ofDataset(NamespaceId.DEFAULT, "dataset2")
);
// Get metadata as it existed 24 hours ago
long yesterday = System.currentTimeMillis() - (24 * 60 * 60 * 1000);
Set<Metadata> historicalMetadata = metadataDataset.getSnapshotBeforeTime(entities, yesterday);
for (Metadata historical : historicalMetadata) {
System.out.println("Historical properties: " + historical.getProperties());
System.out.println("Historical tags: " + historical.getTags());
}
// Compare with current metadata
for (MetadataEntity entity : entities) {
Metadata current = metadataDataset.getMetadata(entity);
// Compare current vs historical...
}// Rebuild indexes for performance optimization
// Process in batches to avoid memory issues
byte[] startRowKey = null;
int batchSize = 1000;
do {
MetadataChange indexRebuild = metadataDataset.rebuildIndexes(startRowKey, batchSize);
System.out.println("Rebuilt indexes for batch");
// Update startRowKey for next batch based on processing
startRowKey = getNextBatchStartKey();
} while (startRowKey != null);
// Delete all indexes (typically for maintenance)
metadataDataset.deleteAllIndexes(1000);// Efficient batch metadata retrieval
Set<MetadataEntity> entities = Set.of(
MetadataEntity.ofDataset(NamespaceId.DEFAULT, "dataset1"),
MetadataEntity.ofDataset(NamespaceId.DEFAULT, "dataset2"),
MetadataEntity.ofDataset(NamespaceId.DEFAULT, "dataset3")
);
Set<Metadata> batchResults = metadataDataset.getMetadata(entities);
for (Metadata result : batchResults) {
System.out.println("Entity metadata: " + result);
}
// Batch property updates
for (MetadataEntity entity : entities) {
Map<String, String> properties = Map.of(
"batch_processed", "true",
"processed_time", String.valueOf(System.currentTimeMillis())
);
metadataDataset.setProperty(entity, properties);
}// Core metadata structures
public final class Metadata {
public MetadataEntity getEntity();
public Map<String, String> getProperties();
public Set<String> getTags();
public MetadataScope getScope();
}
public final class MetadataEntry {
public String getKey();
public String getValue();
public MetadataKind getKind();
}
public final class MetadataChange {
public Metadata getBefore();
public Metadata getAfter();
}
public final class MetadataKey {
public MetadataEntity getEntity();
public String getKey();
public MetadataKind getKind();
}
// Entity identification
public interface MetadataEntity {
EntityType getType();
String getValue();
// Factory methods for common entity types
public static MetadataEntity ofDataset(NamespaceId namespaceId, String dataset);
public static MetadataEntity ofApplication(ApplicationId applicationId);
public static MetadataEntity ofProgram(ProgramId programId);
public static MetadataEntity ofStream(StreamId streamId);
}
// Metadata scopes
public enum MetadataScope {
USER, // User-defined metadata
SYSTEM // System-generated metadata
}
// Metadata kinds
public enum MetadataKind {
PROPERTY,
TAG
}
// Entity types for search filtering
public enum EntityTypeSimpleName {
DATASET,
APPLICATION,
PROGRAM,
STREAM,
VIEW,
NAMESPACE,
ARTIFACT
}
// Exception types
public class MetadataException extends Exception {
public MetadataException(String message);
public MetadataException(String message, Throwable cause);
}
public class BadRequestException extends Exception {
public BadRequestException(String message);
public BadRequestException(String message, Throwable cause);
}Install with Tessl CLI
npx tessl i tessl/maven-co-cask-cdap--cdap-data-fabric