The Cask Data Application Platform (CDAP) is an integrated, open source application development platform for the Hadoop ecosystem that provides developers with data and application abstractions to simplify and accelerate application development.
—
CDAP provides comprehensive security and metadata management capabilities for enterprise data governance. These features enable secure storage of credentials, comprehensive metadata tracking, data lineage recording, and access control across all application components.
The Secure Store provides encrypted, centralized storage for sensitive configuration data such as passwords, API keys, and certificates.
import io.cdap.cdap.api.security.store.*;
// Secure store interface for read access
@Beta
public interface SecureStore {
// List stored secrets
List<SecureStoreMetadata> list(String namespace) throws Exception;
// Retrieve secret data and metadata
SecureStoreData get(String namespace, String name) throws Exception;
// Retrieve only metadata (default implementation)
default SecureStoreMetadata getMetadata(String namespace, String name) throws Exception {
return get(namespace, name);
}
// Retrieve only the secret data
default byte[] getData(String namespace, String name) throws Exception {
return get(namespace, name).get();
}
}
// Secure store management interface
@Beta
public interface SecureStoreManager extends SecureStore {
// Store a secret
void put(String namespace, String name, String data, String description,
Map<String, String> properties) throws Exception;
// Delete a secret
void delete(String namespace, String name) throws Exception;
}
// Secure store data container
public class SecureStoreData {
public byte[] get() { /* returns secret data */ }
public String getName() { /* returns secret name */ }
public String getDescription() { /* returns description */ }
public long getCreationTimeMs() { /* returns creation timestamp */ }
public Map<String, String> getProperties() { /* returns properties */ }
}
// Secure store metadata
public class SecureStoreMetadata {
public String getName() { /* returns secret name */ }
public String getDescription() { /* returns description */ }
public long getCreationTimeMs() { /* returns creation timestamp */ }
public long getLastModifiedTimeMs() { /* returns last modified timestamp */ }
public Map<String, String> getProperties() { /* returns properties */ }
}// Using secure store in applications
public class DatabaseConnectorApp extends AbstractApplication {
@Override
public void configure(ApplicationConfigurer configurer, ApplicationContext context) {
configurer.setName("DatabaseConnector");
configurer.setDescription("Connects to external databases securely");
// Add a MapReduce program that uses secure credentials
configurer.addMapReduce(new SecureDataExtractionMapReduce());
// Add a service that uses API keys from secure store
configurer.addService(new SecureAPIService());
}
}
// MapReduce program using secure store
public class SecureDataExtractionMapReduce extends AbstractMapReduce {
@Override
public void initialize(MapReduceContext context) throws Exception {
Job job = context.getHadoopJob();
// Retrieve database credentials from secure store
SecureStore secureStore = context.getAdmin();
SecureStoreData dbPassword = secureStore.get(context.getNamespace(), "db-password");
SecureStoreData dbUsername = secureStore.get(context.getNamespace(), "db-username");
SecureStoreData dbUrl = secureStore.get(context.getNamespace(), "db-connection-url");
// Configure database connection securely
Configuration conf = job.getConfiguration();
conf.set("db.url", new String(dbUrl.get()));
conf.set("db.username", new String(dbUsername.get()));
conf.set("db.password", new String(dbPassword.get()));
// Set input/output configuration
context.setInput(Input.ofDataset("source_data"));
context.setOutput(Output.ofDataset("extracted_data"));
job.setMapperClass(SecureDataMapper.class);
}
public static class SecureDataMapper extends Mapper<byte[], Row, byte[], Put> {
private Connection dbConnection;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
// Initialize secure database connection
try {
String url = conf.get("db.url");
String username = conf.get("db.username");
String password = conf.get("db.password");
dbConnection = DriverManager.getConnection(url, username, password);
} catch (SQLException e) {
throw new IOException("Failed to connect to database", e);
}
}
@Override
protected void map(byte[] key, Row row, Context context)
throws IOException, InterruptedException {
try {
// Perform secure database operations
String query = "SELECT enrichment_data FROM lookup_table WHERE id = ?";
try (PreparedStatement stmt = dbConnection.prepareStatement(query)) {
stmt.setString(1, row.getString("id"));
try (ResultSet rs = stmt.executeQuery()) {
if (rs.next()) {
Put put = new Put(key);
put.add("original", row.getColumns());
put.add("enriched_data", rs.getString("enrichment_data"));
put.add("enrichment_timestamp", System.currentTimeMillis());
context.write(key, put);
}
}
}
} catch (SQLException e) {
throw new IOException("Database operation failed", e);
}
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
if (dbConnection != null) {
try {
dbConnection.close();
} catch (SQLException e) {
// Log error but don't fail the job
LOG.warn("Failed to close database connection", e);
}
}
}
}
}
// Service using API keys from secure store
@Path("/api")
public class SecureAPIService extends AbstractHttpServiceHandler {
private String apiKey;
private String apiSecret;
@Override
public void initialize(HttpServiceContext context) throws Exception {
super.initialize(context);
// Retrieve API credentials from secure store
SecureStore secureStore = context.getAdmin();
apiKey = new String(secureStore.getData(context.getNamespace(), "external-api-key"));
apiSecret = new String(secureStore.getData(context.getNamespace(), "external-api-secret"));
}
@GET
@Path("/external-data/{id}")
public void fetchExternalData(HttpServiceRequest request, HttpServiceResponder responder,
@PathParam("id") String id) {
try {
// Make authenticated API call using secure credentials
String externalData = callExternalAPI(id, apiKey, apiSecret);
responder.sendJson(200, externalData);
} catch (Exception e) {
LOG.error("Failed to fetch external data for ID: {}", id, e);
responder.sendError(500, "Failed to fetch external data");
}
}
private String callExternalAPI(String id, String key, String secret) throws IOException {
// Implementation for secure external API calls
// Use HTTPS, proper authentication headers, etc.
return "{}"; // Placeholder
}
}
// Administrative operations for secure store management
public class SecureStoreManagementAction extends AbstractCustomAction {
@Override
public void run(CustomActionContext context) throws Exception {
SecureStoreManager storeManager = context.getAdmin();
String namespace = context.getNamespace();
// Setup database credentials during deployment
Map<String, String> dbProperties = new HashMap<>();
dbProperties.put("environment", "production");
dbProperties.put("database_type", "postgresql");
// Store encrypted credentials (would typically come from deployment config)
storeManager.put(namespace, "db-username", "prod_db_user",
"Production database username", dbProperties);
storeManager.put(namespace, "db-password", "secure_password_123",
"Production database password", dbProperties);
storeManager.put(namespace, "db-connection-url",
"jdbc:postgresql://prod-db:5432/main",
"Production database URL", dbProperties);
// Setup API credentials
Map<String, String> apiProperties = new HashMap<>();
apiProperties.put("service", "external-analytics");
apiProperties.put("access_level", "read");
storeManager.put(namespace, "external-api-key", "api_key_xyz789",
"External analytics API key", apiProperties);
storeManager.put(namespace, "external-api-secret", "secret_abc456",
"External analytics API secret", apiProperties);
context.getMetrics().count("secure_store.secrets_configured", 5);
}
}import io.cdap.cdap.api.security.AccessException;
// Access exception for security violations
public class AccessException extends Exception {
public AccessException(String message) { super(message); }
public AccessException(String message, Throwable cause) { super(message, cause); }
}
// Security context and access patterns
public class SecurityAwareService extends AbstractHttpServiceHandler {
@GET
@Path("/secure-data")
public void getSecureData(HttpServiceRequest request, HttpServiceResponder responder) {
try {
// Validate user permissions
validateUserAccess(request);
// Access secure data
String userId = request.getHeader("X-User-ID");
Table userDataTable = getContext().getDataset("user_secure_data");
Row userData = userDataTable.get(Bytes.toBytes(userId));
if (userData.isEmpty()) {
responder.sendError(404, "User data not found");
return;
}
// Filter sensitive fields based on user role
JsonObject response = filterSensitiveData(userData, getUserRole(userId));
responder.sendJson(200, response);
} catch (AccessException e) {
responder.sendError(403, "Access denied: " + e.getMessage());
} catch (Exception e) {
LOG.error("Error accessing secure data", e);
responder.sendError(500, "Internal error");
}
}
private void validateUserAccess(HttpServiceRequest request) throws AccessException {
String authToken = request.getHeader("Authorization");
if (authToken == null || !isValidToken(authToken)) {
throw new AccessException("Invalid or missing authentication token");
}
String userRole = getUserRoleFromToken(authToken);
if (!hasDataAccessPermission(userRole)) {
throw new AccessException("Insufficient permissions for data access");
}
}
private JsonObject filterSensitiveData(Row userData, String userRole) {
JsonObject filtered = new JsonObject();
// Always include basic info
filtered.addProperty("id", userData.getString("id"));
filtered.addProperty("name", userData.getString("name"));
// Include sensitive data only for privileged roles
if ("admin".equals(userRole) || "data_analyst".equals(userRole)) {
filtered.addProperty("ssn", userData.getString("ssn"));
filtered.addProperty("salary", userData.getLong("salary"));
}
// Include PII only for admin role
if ("admin".equals(userRole)) {
filtered.addProperty("address", userData.getString("address"));
filtered.addProperty("phone", userData.getString("phone"));
}
return filtered;
}
private boolean isValidToken(String token) {
// Implementation for token validation
return token.startsWith("Bearer ") && token.length() > 50;
}
private String getUserRoleFromToken(String token) {
// Implementation for extracting user role from token
return "user"; // Default role
}
private String getUserRole(String userId) {
// Implementation for getting user role from user ID
return "user"; // Default role
}
private boolean hasDataAccessPermission(String role) {
return Arrays.asList("admin", "data_analyst", "user").contains(role);
}
}CDAP provides comprehensive metadata management for tracking data schema, properties, tags, and relationships across all application components.
import io.cdap.cdap.api.metadata.*;
// Metadata reader interface
@Beta
public interface MetadataReader {
// Get all metadata for an entity
Map<MetadataScope, Metadata> getMetadata(MetadataEntity metadataEntity) throws MetadataException;
// Get metadata for specific scope
Metadata getMetadata(MetadataScope scope, MetadataEntity metadataEntity) throws MetadataException;
}
// Metadata writer interface
@Beta
public interface MetadataWriter {
// Add properties to an entity
void addProperties(MetadataEntity metadataEntity, Map<String, String> properties);
// Add tags to an entity
void addTags(MetadataEntity metadataEntity, String... tags);
void addTags(MetadataEntity metadataEntity, Iterable<String> tags);
// Remove properties from an entity
void removeProperties(MetadataEntity metadataEntity, String... keys);
// Remove tags from an entity
void removeTags(MetadataEntity metadataEntity, String... tags);
// Remove all metadata for an entity
void removeMetadata(MetadataEntity metadataEntity);
}
// Metadata container
public class Metadata {
public static final Metadata EMPTY = new Metadata(Collections.emptyMap(), Collections.emptySet());
public Metadata(Map<String, String> properties, Set<String> tags) { /* constructor */ }
public Map<String, String> getProperties() { /* returns properties */ }
public Set<String> getTags() { /* returns tags */ }
public boolean isEmpty() { /* returns if metadata is empty */ }
}
// Metadata entity identification
public class MetadataEntity {
public static Builder builder() { return new Builder(); }
public String getType() { /* returns entity type */ }
public List<String> getValue() { /* returns entity value components */ }
public static class Builder {
public Builder append(String type, String value) { /* append component */ return this; }
public Builder appendAsType(String value) { /* append as type */ return this; }
public MetadataEntity build() { /* build entity */ }
}
}
// Metadata scopes
public enum MetadataScope {
USER, // User-defined metadata
SYSTEM // System-generated metadata
}
// Metadata exception
public class MetadataException extends Exception {
public MetadataException(String message) { super(message); }
public MetadataException(String message, Throwable cause) { super(message, cause); }
}// Metadata management in applications
public class DataGovernanceApp extends AbstractApplication {
@Override
public void configure(ApplicationConfigurer configurer, ApplicationContext context) {
configurer.setName("DataGovernanceApp");
configurer.setDescription("Manages data lineage and metadata");
// Add program for metadata enrichment
configurer.addMapReduce(new MetadataEnrichmentMapReduce());
// Add service for metadata queries
configurer.addService(new MetadataQueryService());
// Add workflow for periodic metadata updates
configurer.addWorkflow(new MetadataMaintenanceWorkflow());
}
}
// MapReduce program that enriches data with metadata
public class MetadataEnrichmentMapReduce extends AbstractMapReduce {
@Override
public void initialize(MapReduceContext context) throws Exception {
Job job = context.getHadoopJob();
context.setInput(Input.ofDataset("raw_customer_data"));
context.setOutput(Output.ofDataset("enriched_customer_data"));
job.setMapperClass(MetadataEnrichmentMapper.class);
job.setReducerClass(MetadataAggregationReducer.class);
// Add metadata about this processing job
MetadataWriter metadataWriter = context.getMetadataWriter();
MetadataEntity jobEntity = MetadataEntity.builder()
.appendAsType("program")
.append("application", context.getApplicationSpecification().getName())
.append("program", context.getSpecification().getName())
.build();
// Tag the processing job
metadataWriter.addTags(jobEntity, "customer-processing", "batch-enrichment", "daily");
// Add properties
Map<String, String> jobProperties = new HashMap<>();
jobProperties.put("data_source", "customer_database");
jobProperties.put("processing_type", "enrichment");
jobProperties.put("schedule", "daily");
jobProperties.put("owner", "data-team");
metadataWriter.addProperties(jobEntity, jobProperties);
}
public static class MetadataEnrichmentMapper extends Mapper<byte[], Row, Text, CustomerRecord> {
private MetadataWriter metadataWriter;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
// Access metadata writer from context
metadataWriter = ((MapReduceTaskContext<?>) context).getMetadataWriter();
}
@Override
protected void map(byte[] key, Row row, Context context)
throws IOException, InterruptedException {
String customerId = row.getString("customer_id");
CustomerRecord customer = new CustomerRecord();
customer.setId(customerId);
customer.setName(row.getString("name"));
customer.setEmail(row.getString("email"));
customer.setRegistrationDate(row.getLong("registration_date"));
// Enrich with metadata
enrichCustomerWithMetadata(customer);
// Track data lineage for the customer record
trackCustomerDataLineage(customerId, customer);
context.write(new Text(customerId), customer);
}
private void enrichCustomerWithMetadata(CustomerRecord customer) {
// Add computed metadata
long daysSinceRegistration =
(System.currentTimeMillis() - customer.getRegistrationDate()) / (24 * 60 * 60 * 1000);
customer.setDaysSinceRegistration(daysSinceRegistration);
// Classify customer based on data
String customerTier = classifyCustomerTier(customer);
customer.setTier(customerTier);
// Add data quality score
double qualityScore = calculateDataQuality(customer);
customer.setDataQualityScore(qualityScore);
}
private void trackCustomerDataLineage(String customerId, CustomerRecord customer) {
// Create metadata entity for this customer record
MetadataEntity customerEntity = MetadataEntity.builder()
.appendAsType("customer")
.append("id", customerId)
.build();
// Add metadata tags
metadataWriter.addTags(customerEntity, "customer-data", "pii", customer.getTier());
// Add metadata properties
Map<String, String> properties = new HashMap<>();
properties.put("data_classification", containsPII(customer) ? "sensitive" : "public");
properties.put("last_processed", String.valueOf(System.currentTimeMillis()));
properties.put("processing_job", "MetadataEnrichmentMapReduce");
properties.put("data_quality_score", String.valueOf(customer.getDataQualityScore()));
properties.put("customer_tier", customer.getTier());
metadataWriter.addProperties(customerEntity, properties);
}
private String classifyCustomerTier(CustomerRecord customer) {
// Logic to classify customer tier
return customer.getDaysSinceRegistration() > 365 ? "gold" : "silver";
}
private double calculateDataQuality(CustomerRecord customer) {
// Logic to calculate data quality score
double score = 1.0;
if (customer.getName() == null || customer.getName().isEmpty()) score -= 0.3;
if (customer.getEmail() == null || !isValidEmail(customer.getEmail())) score -= 0.4;
return Math.max(0.0, score);
}
private boolean containsPII(CustomerRecord customer) {
return customer.getEmail() != null || customer.getName() != null;
}
private boolean isValidEmail(String email) {
return email.contains("@") && email.contains(".");
}
}
}
// Service for querying metadata
@Path("/metadata")
public class MetadataQueryService extends AbstractHttpServiceHandler {
@GET
@Path("/entity/{type}/{id}")
public void getEntityMetadata(HttpServiceRequest request, HttpServiceResponder responder,
@PathParam("type") String entityType,
@PathParam("id") String entityId) {
try {
MetadataReader metadataReader = getContext().getMetadataReader();
MetadataEntity entity = MetadataEntity.builder()
.appendAsType(entityType)
.append("id", entityId)
.build();
Map<MetadataScope, Metadata> allMetadata = metadataReader.getMetadata(entity);
JsonObject response = new JsonObject();
for (Map.Entry<MetadataScope, Metadata> entry : allMetadata.entrySet()) {
JsonObject scopeMetadata = new JsonObject();
// Add properties
JsonObject properties = new JsonObject();
for (Map.Entry<String, String> prop : entry.getValue().getProperties().entrySet()) {
properties.addProperty(prop.getKey(), prop.getValue());
}
scopeMetadata.add("properties", properties);
// Add tags
JsonArray tags = new JsonArray();
for (String tag : entry.getValue().getTags()) {
tags.add(tag);
}
scopeMetadata.add("tags", tags);
response.add(entry.getKey().name().toLowerCase(), scopeMetadata);
}
responder.sendJson(200, response);
} catch (MetadataException e) {
responder.sendError(500, "Metadata query failed: " + e.getMessage());
} catch (Exception e) {
responder.sendError(500, "Internal error: " + e.getMessage());
}
}
@POST
@Path("/entity/{type}/{id}/tags")
public void addEntityTags(HttpServiceRequest request, HttpServiceResponder responder,
@PathParam("type") String entityType,
@PathParam("id") String entityId) {
try {
String content = Charset.forName("UTF-8").decode(
ByteBuffer.wrap(request.getContent())).toString();
JsonObject requestJson = new JsonParser().parse(content).getAsJsonObject();
JsonArray tagsArray = requestJson.getAsJsonArray("tags");
String[] tags = new String[tagsArray.size()];
for (int i = 0; i < tagsArray.size(); i++) {
tags[i] = tagsArray.get(i).getAsString();
}
MetadataWriter metadataWriter = getContext().getMetadataWriter();
MetadataEntity entity = MetadataEntity.builder()
.appendAsType(entityType)
.append("id", entityId)
.build();
metadataWriter.addTags(entity, tags);
responder.sendString(200, "Tags added successfully", "text/plain");
} catch (Exception e) {
responder.sendError(500, "Failed to add tags: " + e.getMessage());
}
}
}Data lineage tracking provides comprehensive visibility into data flow, transformations, and dependencies across the entire data processing pipeline.
import io.cdap.cdap.api.lineage.field.*;
// Lineage recorder interface
public interface LineageRecorder {
// Record lineage operations
void record(Collection<? extends Operation> operations);
// Flush recorded lineage (automatic for batch programs, manual for streaming)
void flushLineage() throws IllegalArgumentException;
}
// Base operation for lineage tracking
public abstract class Operation {
public Operation(String name, String description, OperationType type,
List<? extends EndPoint> inputs, List<? extends EndPoint> outputs) {
/* constructor */
}
public String getName() { /* returns operation name */ }
public String getDescription() { /* returns operation description */ }
public OperationType getType() { /* returns operation type */ }
public List<EndPoint> getInputs() { /* returns input endpoints */ }
public List<EndPoint> getOutputs() { /* returns output endpoints */ }
}
// Operation types
public enum OperationType {
READ, // Read operation from external source
write, // Write operation to external sink
transform // Transformation operation
}
// Specific operation types
public class ReadOperation extends Operation {
public ReadOperation(String name, String description, EndPoint source,
String... fields) { /* constructor */ }
}
public class WriteOperation extends Operation {
public WriteOperation(String name, String description, EndPoint sink,
InputField... inputs) { /* constructor */ }
}
public class TransformOperation extends Operation {
public TransformOperation(String name, String description,
List<InputField> inputs, String... outputs) { /* constructor */ }
}
// Lineage endpoints
public class EndPoint {
public static EndPoint of(String namespace, String name) { /* create endpoint */ }
public static EndPoint of(String namespace, String name, Map<String, String> properties) { /* create with props */ }
public String getNamespace() { /* returns namespace */ }
public String getName() { /* returns name */ }
public Map<String, String> getProperties() { /* returns properties */ }
}
// Input field for lineage tracking
public class InputField {
public static InputField of(String operationName, String fieldName) { /* create input field */ }
public String getOrigin() { /* returns origin operation */ }
public String getField() { /* returns field name */ }
}// MapReduce program with comprehensive lineage tracking
public class CustomerAnalyticsMapReduce extends AbstractMapReduce {
@Override
public void initialize(MapReduceContext context) throws Exception {
Job job = context.getHadoopJob();
context.setInput(Input.ofDataset("customer_profiles"));
context.addInput(Input.ofDataset("purchase_history"));
context.addInput(Input.ofDataset("support_tickets"));
context.setOutput(Output.ofDataset("customer_analytics"));
job.setMapperClass(CustomerAnalyticsMapper.class);
job.setReducerClass(CustomerAnalyticsReducer.class);
// Record lineage for this MapReduce job
recordJobLineage(context);
}
private void recordJobLineage(MapReduceContext context) {
LineageRecorder lineageRecorder = context;
// Define input endpoints
EndPoint customerProfilesEP = EndPoint.of(context.getNamespace(), "customer_profiles");
EndPoint purchaseHistoryEP = EndPoint.of(context.getNamespace(), "purchase_history");
EndPoint supportTicketsEP = EndPoint.of(context.getNamespace(), "support_tickets");
// Define output endpoint
EndPoint customerAnalyticsEP = EndPoint.of(context.getNamespace(), "customer_analytics");
List<Operation> operations = new ArrayList<>();
// Record read operations
operations.add(new ReadOperation("read_customer_profiles",
"Read customer profile data", customerProfilesEP,
"customer_id", "name", "email", "registration_date", "tier"));
operations.add(new ReadOperation("read_purchase_history",
"Read customer purchase history", purchaseHistoryEP,
"customer_id", "purchase_date", "amount", "product_category"));
operations.add(new ReadOperation("read_support_tickets",
"Read customer support interactions", supportTicketsEP,
"customer_id", "ticket_date", "issue_type", "resolution_time"));
// Record transformation operations
operations.add(new TransformOperation("calculate_purchase_metrics",
"Calculate purchase-related metrics per customer",
Arrays.asList(
InputField.of("read_purchase_history", "customer_id"),
InputField.of("read_purchase_history", "amount"),
InputField.of("read_purchase_history", "purchase_date")
),
"total_spent", "avg_order_value", "purchase_frequency", "last_purchase_date"
));
operations.add(new TransformOperation("calculate_support_metrics",
"Calculate support-related metrics per customer",
Arrays.asList(
InputField.of("read_support_tickets", "customer_id"),
InputField.of("read_support_tickets", "ticket_date"),
InputField.of("read_support_tickets", "resolution_time")
),
"total_tickets", "avg_resolution_time", "last_ticket_date"
));
operations.add(new TransformOperation("merge_customer_data",
"Merge profile, purchase, and support data per customer",
Arrays.asList(
InputField.of("read_customer_profiles", "customer_id"),
InputField.of("read_customer_profiles", "name"),
InputField.of("read_customer_profiles", "email"),
InputField.of("read_customer_profiles", "tier"),
InputField.of("calculate_purchase_metrics", "total_spent"),
InputField.of("calculate_purchase_metrics", "avg_order_value"),
InputField.of("calculate_purchase_metrics", "purchase_frequency"),
InputField.of("calculate_support_metrics", "total_tickets"),
InputField.of("calculate_support_metrics", "avg_resolution_time")
),
"customer_id", "name", "email", "tier", "total_spent", "avg_order_value",
"purchase_frequency", "total_tickets", "avg_resolution_time", "customer_score"
));
// Record write operation
operations.add(new WriteOperation("write_customer_analytics",
"Write aggregated customer analytics", customerAnalyticsEP,
InputField.of("merge_customer_data", "customer_id"),
InputField.of("merge_customer_data", "name"),
InputField.of("merge_customer_data", "email"),
InputField.of("merge_customer_data", "tier"),
InputField.of("merge_customer_data", "total_spent"),
InputField.of("merge_customer_data", "avg_order_value"),
InputField.of("merge_customer_data", "purchase_frequency"),
InputField.of("merge_customer_data", "total_tickets"),
InputField.of("merge_customer_data", "avg_resolution_time"),
InputField.of("merge_customer_data", "customer_score")
));
// Record all operations for lineage
lineageRecorder.record(operations);
}
// Mapper and Reducer implementations would process the actual data
// while the lineage operations describe the data flow and transformations
}
// Spark program with streaming lineage tracking
public class RealTimeRecommendationSpark extends AbstractSpark {
@Override
public void run(SparkClientContext context) throws Exception {
SparkSession spark = context.getSparkSession();
LineageRecorder lineageRecorder = context;
// Read streaming data
Dataset<Row> userEvents = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "user-events")
.load();
// Read static user profiles
JavaPairRDD<byte[], Row> profilesRDD = context.fromDataset("user_profiles");
Dataset<Row> userProfiles = spark.createDataFrame(profilesRDD.map(Tuple2::_2).rdd(), getProfileSchema());
// Process and generate recommendations
Dataset<Row> recommendations = userEvents
.join(userProfiles, "user_id")
.groupBy("user_id", "category")
.agg(count("event").as("event_count"))
.withColumn("recommendation_score", expr("event_count * 0.8"))
.filter("recommendation_score > 5");
// Record lineage for streaming operations
recordStreamingLineage(lineageRecorder, context.getNamespace());
// Write recommendations and flush lineage periodically
StreamingQuery query = recommendations
.writeStream()
.outputMode("update")
.foreachBatch((Dataset<Row> batchDF, Long batchId) -> {
// Convert and save to dataset
JavaPairRDD<byte[], Put> outputRDD = batchDF.javaRDD().mapToPair(row -> {
String key = row.getAs("user_id") + "_" + row.getAs("category");
Put put = new Put(Bytes.toBytes(key));
put.add("rec", "user_id", row.getAs("user_id"));
put.add("rec", "category", row.getAs("category"));
put.add("rec", "score", row.getAs("recommendation_score"));
put.add("rec", "batch_id", batchId);
return new Tuple2<>(Bytes.toBytes(key), put);
});
context.saveAsDataset(outputRDD, "user_recommendations");
// Manually flush lineage for streaming program
if (batchId % 10 == 0) { // Flush every 10 batches
lineageRecorder.flushLineage();
}
})
.start();
query.awaitTermination();
}
private void recordStreamingLineage(LineageRecorder recorder, String namespace) {
List<Operation> operations = new ArrayList<>();
// Input endpoints
Map<String, String> kafkaProps = new HashMap<>();
kafkaProps.put("topic", "user-events");
kafkaProps.put("format", "json");
EndPoint userEventsEP = EndPoint.of("kafka", "user-events", kafkaProps);
EndPoint userProfilesEP = EndPoint.of(namespace, "user_profiles");
EndPoint recommendationsEP = EndPoint.of(namespace, "user_recommendations");
// Read operations
operations.add(new ReadOperation("read_user_events",
"Stream user events from Kafka", userEventsEP,
"user_id", "event_type", "category", "timestamp"));
operations.add(new ReadOperation("read_user_profiles",
"Read static user profile data", userProfilesEP,
"user_id", "preferences", "demographics"));
// Transform operations
operations.add(new TransformOperation("join_events_profiles",
"Join streaming events with user profiles",
Arrays.asList(
InputField.of("read_user_events", "user_id"),
InputField.of("read_user_events", "event_type"),
InputField.of("read_user_events", "category"),
InputField.of("read_user_profiles", "user_id"),
InputField.of("read_user_profiles", "preferences")
),
"user_id", "event_type", "category", "preferences"
));
operations.add(new TransformOperation("calculate_recommendations",
"Calculate recommendation scores based on event patterns",
Arrays.asList(
InputField.of("join_events_profiles", "user_id"),
InputField.of("join_events_profiles", "category"),
InputField.of("join_events_profiles", "event_type")
),
"user_id", "category", "recommendation_score"
));
// Write operation
operations.add(new WriteOperation("write_recommendations",
"Write real-time recommendations", recommendationsEP,
InputField.of("calculate_recommendations", "user_id"),
InputField.of("calculate_recommendations", "category"),
InputField.of("calculate_recommendations", "recommendation_score")
));
recorder.record(operations);
}
}The Security & Metadata framework in CDAP enables comprehensive data governance with enterprise-grade security features, detailed metadata tracking, and complete data lineage visibility across all data processing operations.
Install with Tessl CLI
npx tessl i tessl/maven-io-cdap-cdap--cdap