CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-io-cdap-cdap--cdap

The Cask Data Application Platform (CDAP) is an integrated, open source application development platform for the Hadoop ecosystem that provides developers with data and application abstractions to simplify and accelerate application development.

Pending
Overview
Eval results
Files

security-metadata.mddocs/

Security & Metadata

CDAP provides comprehensive security and metadata management capabilities for enterprise data governance. These features enable secure storage of credentials, comprehensive metadata tracking, data lineage recording, and access control across all application components.

Security Framework

Secure Store

The Secure Store provides encrypted, centralized storage for sensitive configuration data such as passwords, API keys, and certificates.

import io.cdap.cdap.api.security.store.*;

// Secure store interface for read access
@Beta
public interface SecureStore {
    
    // List stored secrets
    List<SecureStoreMetadata> list(String namespace) throws Exception;
    
    // Retrieve secret data and metadata
    SecureStoreData get(String namespace, String name) throws Exception;
    
    // Retrieve only metadata (default implementation)
    default SecureStoreMetadata getMetadata(String namespace, String name) throws Exception {
        return get(namespace, name);
    }
    
    // Retrieve only the secret data
    default byte[] getData(String namespace, String name) throws Exception {
        return get(namespace, name).get();
    }
}

// Secure store management interface
@Beta
public interface SecureStoreManager extends SecureStore {
    
    // Store a secret
    void put(String namespace, String name, String data, String description, 
             Map<String, String> properties) throws Exception;
    
    // Delete a secret
    void delete(String namespace, String name) throws Exception;
}

// Secure store data container
public class SecureStoreData {
    public byte[] get() { /* returns secret data */ }
    public String getName() { /* returns secret name */ }
    public String getDescription() { /* returns description */ }
    public long getCreationTimeMs() { /* returns creation timestamp */ }
    public Map<String, String> getProperties() { /* returns properties */ }
}

// Secure store metadata
public class SecureStoreMetadata {
    public String getName() { /* returns secret name */ }
    public String getDescription() { /* returns description */ }
    public long getCreationTimeMs() { /* returns creation timestamp */ }
    public long getLastModifiedTimeMs() { /* returns last modified timestamp */ }
    public Map<String, String> getProperties() { /* returns properties */ }
}

Secure Store Usage Examples

// Using secure store in applications
public class DatabaseConnectorApp extends AbstractApplication {
    
    @Override
    public void configure(ApplicationConfigurer configurer, ApplicationContext context) {
        configurer.setName("DatabaseConnector");
        configurer.setDescription("Connects to external databases securely");
        
        // Add a MapReduce program that uses secure credentials
        configurer.addMapReduce(new SecureDataExtractionMapReduce());
        
        // Add a service that uses API keys from secure store
        configurer.addService(new SecureAPIService());
    }
}

// MapReduce program using secure store
public class SecureDataExtractionMapReduce extends AbstractMapReduce {
    
    @Override
    public void initialize(MapReduceContext context) throws Exception {
        Job job = context.getHadoopJob();
        
        // Retrieve database credentials from secure store
        SecureStore secureStore = context.getAdmin();
        SecureStoreData dbPassword = secureStore.get(context.getNamespace(), "db-password");
        SecureStoreData dbUsername = secureStore.get(context.getNamespace(), "db-username");
        SecureStoreData dbUrl = secureStore.get(context.getNamespace(), "db-connection-url");
        
        // Configure database connection securely
        Configuration conf = job.getConfiguration();
        conf.set("db.url", new String(dbUrl.get()));
        conf.set("db.username", new String(dbUsername.get()));
        conf.set("db.password", new String(dbPassword.get()));
        
        // Set input/output configuration
        context.setInput(Input.ofDataset("source_data"));
        context.setOutput(Output.ofDataset("extracted_data"));
        
        job.setMapperClass(SecureDataMapper.class);
    }
    
    public static class SecureDataMapper extends Mapper<byte[], Row, byte[], Put> {
        private Connection dbConnection;
        
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            Configuration conf = context.getConfiguration();
            
            // Initialize secure database connection
            try {
                String url = conf.get("db.url");
                String username = conf.get("db.username");
                String password = conf.get("db.password");
                
                dbConnection = DriverManager.getConnection(url, username, password);
            } catch (SQLException e) {
                throw new IOException("Failed to connect to database", e);
            }
        }
        
        @Override
        protected void map(byte[] key, Row row, Context context) 
            throws IOException, InterruptedException {
            
            try {
                // Perform secure database operations
                String query = "SELECT enrichment_data FROM lookup_table WHERE id = ?";
                try (PreparedStatement stmt = dbConnection.prepareStatement(query)) {
                    stmt.setString(1, row.getString("id"));
                    
                    try (ResultSet rs = stmt.executeQuery()) {
                        if (rs.next()) {
                            Put put = new Put(key);
                            put.add("original", row.getColumns());
                            put.add("enriched_data", rs.getString("enrichment_data"));
                            put.add("enrichment_timestamp", System.currentTimeMillis());
                            
                            context.write(key, put);
                        }
                    }
                }
            } catch (SQLException e) {
                throw new IOException("Database operation failed", e);
            }
        }
        
        @Override
        protected void cleanup(Context context) throws IOException, InterruptedException {
            if (dbConnection != null) {
                try {
                    dbConnection.close();
                } catch (SQLException e) {
                    // Log error but don't fail the job
                    LOG.warn("Failed to close database connection", e);
                }
            }
        }
    }
}

// Service using API keys from secure store
@Path("/api")
public class SecureAPIService extends AbstractHttpServiceHandler {
    
    private String apiKey;
    private String apiSecret;
    
    @Override
    public void initialize(HttpServiceContext context) throws Exception {
        super.initialize(context);
        
        // Retrieve API credentials from secure store
        SecureStore secureStore = context.getAdmin();
        apiKey = new String(secureStore.getData(context.getNamespace(), "external-api-key"));
        apiSecret = new String(secureStore.getData(context.getNamespace(), "external-api-secret"));
    }
    
    @GET
    @Path("/external-data/{id}")
    public void fetchExternalData(HttpServiceRequest request, HttpServiceResponder responder,
                                 @PathParam("id") String id) {
        try {
            // Make authenticated API call using secure credentials
            String externalData = callExternalAPI(id, apiKey, apiSecret);
            responder.sendJson(200, externalData);
        } catch (Exception e) {
            LOG.error("Failed to fetch external data for ID: {}", id, e);
            responder.sendError(500, "Failed to fetch external data");
        }
    }
    
    private String callExternalAPI(String id, String key, String secret) throws IOException {
        // Implementation for secure external API calls
        // Use HTTPS, proper authentication headers, etc.
        return "{}"; // Placeholder
    }
}

// Administrative operations for secure store management
public class SecureStoreManagementAction extends AbstractCustomAction {
    
    @Override
    public void run(CustomActionContext context) throws Exception {
        SecureStoreManager storeManager = context.getAdmin();
        String namespace = context.getNamespace();
        
        // Setup database credentials during deployment
        Map<String, String> dbProperties = new HashMap<>();
        dbProperties.put("environment", "production");
        dbProperties.put("database_type", "postgresql");
        
        // Store encrypted credentials (would typically come from deployment config)
        storeManager.put(namespace, "db-username", "prod_db_user", 
                        "Production database username", dbProperties);
        storeManager.put(namespace, "db-password", "secure_password_123", 
                        "Production database password", dbProperties);
        storeManager.put(namespace, "db-connection-url", 
                        "jdbc:postgresql://prod-db:5432/main", 
                        "Production database URL", dbProperties);
        
        // Setup API credentials
        Map<String, String> apiProperties = new HashMap<>();
        apiProperties.put("service", "external-analytics");
        apiProperties.put("access_level", "read");
        
        storeManager.put(namespace, "external-api-key", "api_key_xyz789",
                        "External analytics API key", apiProperties);
        storeManager.put(namespace, "external-api-secret", "secret_abc456",
                        "External analytics API secret", apiProperties);
        
        context.getMetrics().count("secure_store.secrets_configured", 5);
    }
}

Access Control

import io.cdap.cdap.api.security.AccessException;

// Access exception for security violations
public class AccessException extends Exception {
    public AccessException(String message) { super(message); }
    public AccessException(String message, Throwable cause) { super(message, cause); }
}

// Security context and access patterns
public class SecurityAwareService extends AbstractHttpServiceHandler {
    
    @GET
    @Path("/secure-data")
    public void getSecureData(HttpServiceRequest request, HttpServiceResponder responder) {
        try {
            // Validate user permissions
            validateUserAccess(request);
            
            // Access secure data
            String userId = request.getHeader("X-User-ID");
            Table userDataTable = getContext().getDataset("user_secure_data");
            
            Row userData = userDataTable.get(Bytes.toBytes(userId));
            if (userData.isEmpty()) {
                responder.sendError(404, "User data not found");
                return;
            }
            
            // Filter sensitive fields based on user role
            JsonObject response = filterSensitiveData(userData, getUserRole(userId));
            responder.sendJson(200, response);
            
        } catch (AccessException e) {
            responder.sendError(403, "Access denied: " + e.getMessage());
        } catch (Exception e) {
            LOG.error("Error accessing secure data", e);
            responder.sendError(500, "Internal error");
        }
    }
    
    private void validateUserAccess(HttpServiceRequest request) throws AccessException {
        String authToken = request.getHeader("Authorization");
        if (authToken == null || !isValidToken(authToken)) {
            throw new AccessException("Invalid or missing authentication token");
        }
        
        String userRole = getUserRoleFromToken(authToken);
        if (!hasDataAccessPermission(userRole)) {
            throw new AccessException("Insufficient permissions for data access");
        }
    }
    
    private JsonObject filterSensitiveData(Row userData, String userRole) {
        JsonObject filtered = new JsonObject();
        
        // Always include basic info
        filtered.addProperty("id", userData.getString("id"));
        filtered.addProperty("name", userData.getString("name"));
        
        // Include sensitive data only for privileged roles
        if ("admin".equals(userRole) || "data_analyst".equals(userRole)) {
            filtered.addProperty("ssn", userData.getString("ssn"));
            filtered.addProperty("salary", userData.getLong("salary"));
        }
        
        // Include PII only for admin role
        if ("admin".equals(userRole)) {
            filtered.addProperty("address", userData.getString("address"));
            filtered.addProperty("phone", userData.getString("phone"));
        }
        
        return filtered;
    }
    
    private boolean isValidToken(String token) {
        // Implementation for token validation
        return token.startsWith("Bearer ") && token.length() > 50;
    }
    
    private String getUserRoleFromToken(String token) {
        // Implementation for extracting user role from token
        return "user"; // Default role
    }
    
    private String getUserRole(String userId) {
        // Implementation for getting user role from user ID
        return "user"; // Default role
    }
    
    private boolean hasDataAccessPermission(String role) {
        return Arrays.asList("admin", "data_analyst", "user").contains(role);
    }
}

Metadata Management

Metadata Framework

CDAP provides comprehensive metadata management for tracking data schema, properties, tags, and relationships across all application components.

import io.cdap.cdap.api.metadata.*;

// Metadata reader interface
@Beta
public interface MetadataReader {
    
    // Get all metadata for an entity
    Map<MetadataScope, Metadata> getMetadata(MetadataEntity metadataEntity) throws MetadataException;
    
    // Get metadata for specific scope
    Metadata getMetadata(MetadataScope scope, MetadataEntity metadataEntity) throws MetadataException;
}

// Metadata writer interface  
@Beta
public interface MetadataWriter {
    
    // Add properties to an entity
    void addProperties(MetadataEntity metadataEntity, Map<String, String> properties);
    
    // Add tags to an entity
    void addTags(MetadataEntity metadataEntity, String... tags);
    void addTags(MetadataEntity metadataEntity, Iterable<String> tags);
    
    // Remove properties from an entity
    void removeProperties(MetadataEntity metadataEntity, String... keys);
    
    // Remove tags from an entity
    void removeTags(MetadataEntity metadataEntity, String... tags);
    
    // Remove all metadata for an entity
    void removeMetadata(MetadataEntity metadataEntity);
}

// Metadata container
public class Metadata {
    public static final Metadata EMPTY = new Metadata(Collections.emptyMap(), Collections.emptySet());
    
    public Metadata(Map<String, String> properties, Set<String> tags) { /* constructor */ }
    
    public Map<String, String> getProperties() { /* returns properties */ }
    public Set<String> getTags() { /* returns tags */ }
    public boolean isEmpty() { /* returns if metadata is empty */ }
}

// Metadata entity identification
public class MetadataEntity {
    public static Builder builder() { return new Builder(); }
    
    public String getType() { /* returns entity type */ }
    public List<String> getValue() { /* returns entity value components */ }
    
    public static class Builder {
        public Builder append(String type, String value) { /* append component */ return this; }
        public Builder appendAsType(String value) { /* append as type */ return this; }
        public MetadataEntity build() { /* build entity */ }
    }
}

// Metadata scopes
public enum MetadataScope {
    USER,      // User-defined metadata
    SYSTEM     // System-generated metadata
}

// Metadata exception
public class MetadataException extends Exception {
    public MetadataException(String message) { super(message); }
    public MetadataException(String message, Throwable cause) { super(message, cause); }
}

Metadata Usage Examples

// Metadata management in applications
public class DataGovernanceApp extends AbstractApplication {
    
    @Override
    public void configure(ApplicationConfigurer configurer, ApplicationContext context) {
        configurer.setName("DataGovernanceApp");
        configurer.setDescription("Manages data lineage and metadata");
        
        // Add program for metadata enrichment
        configurer.addMapReduce(new MetadataEnrichmentMapReduce());
        
        // Add service for metadata queries
        configurer.addService(new MetadataQueryService());
        
        // Add workflow for periodic metadata updates
        configurer.addWorkflow(new MetadataMaintenanceWorkflow());
    }
}

// MapReduce program that enriches data with metadata
public class MetadataEnrichmentMapReduce extends AbstractMapReduce {
    
    @Override
    public void initialize(MapReduceContext context) throws Exception {
        Job job = context.getHadoopJob();
        
        context.setInput(Input.ofDataset("raw_customer_data"));
        context.setOutput(Output.ofDataset("enriched_customer_data"));
        
        job.setMapperClass(MetadataEnrichmentMapper.class);
        job.setReducerClass(MetadataAggregationReducer.class);
        
        // Add metadata about this processing job
        MetadataWriter metadataWriter = context.getMetadataWriter();
        MetadataEntity jobEntity = MetadataEntity.builder()
            .appendAsType("program")
            .append("application", context.getApplicationSpecification().getName())
            .append("program", context.getSpecification().getName())
            .build();
            
        // Tag the processing job
        metadataWriter.addTags(jobEntity, "customer-processing", "batch-enrichment", "daily");
        
        // Add properties
        Map<String, String> jobProperties = new HashMap<>();
        jobProperties.put("data_source", "customer_database");
        jobProperties.put("processing_type", "enrichment");
        jobProperties.put("schedule", "daily");
        jobProperties.put("owner", "data-team");
        metadataWriter.addProperties(jobEntity, jobProperties);
    }
    
    public static class MetadataEnrichmentMapper extends Mapper<byte[], Row, Text, CustomerRecord> {
        
        private MetadataWriter metadataWriter;
        
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            // Access metadata writer from context
            metadataWriter = ((MapReduceTaskContext<?>) context).getMetadataWriter();
        }
        
        @Override
        protected void map(byte[] key, Row row, Context context) 
            throws IOException, InterruptedException {
            
            String customerId = row.getString("customer_id");
            CustomerRecord customer = new CustomerRecord();
            customer.setId(customerId);
            customer.setName(row.getString("name"));
            customer.setEmail(row.getString("email"));
            customer.setRegistrationDate(row.getLong("registration_date"));
            
            // Enrich with metadata
            enrichCustomerWithMetadata(customer);
            
            // Track data lineage for the customer record
            trackCustomerDataLineage(customerId, customer);
            
            context.write(new Text(customerId), customer);
        }
        
        private void enrichCustomerWithMetadata(CustomerRecord customer) {
            // Add computed metadata
            long daysSinceRegistration = 
                (System.currentTimeMillis() - customer.getRegistrationDate()) / (24 * 60 * 60 * 1000);
            customer.setDaysSinceRegistration(daysSinceRegistration);
            
            // Classify customer based on data
            String customerTier = classifyCustomerTier(customer);
            customer.setTier(customerTier);
            
            // Add data quality score
            double qualityScore = calculateDataQuality(customer);
            customer.setDataQualityScore(qualityScore);
        }
        
        private void trackCustomerDataLineage(String customerId, CustomerRecord customer) {
            // Create metadata entity for this customer record
            MetadataEntity customerEntity = MetadataEntity.builder()
                .appendAsType("customer")
                .append("id", customerId)
                .build();
                
            // Add metadata tags
            metadataWriter.addTags(customerEntity, "customer-data", "pii", customer.getTier());
            
            // Add metadata properties
            Map<String, String> properties = new HashMap<>();
            properties.put("data_classification", containsPII(customer) ? "sensitive" : "public");
            properties.put("last_processed", String.valueOf(System.currentTimeMillis()));
            properties.put("processing_job", "MetadataEnrichmentMapReduce");
            properties.put("data_quality_score", String.valueOf(customer.getDataQualityScore()));
            properties.put("customer_tier", customer.getTier());
            
            metadataWriter.addProperties(customerEntity, properties);
        }
        
        private String classifyCustomerTier(CustomerRecord customer) {
            // Logic to classify customer tier
            return customer.getDaysSinceRegistration() > 365 ? "gold" : "silver";
        }
        
        private double calculateDataQuality(CustomerRecord customer) {
            // Logic to calculate data quality score
            double score = 1.0;
            if (customer.getName() == null || customer.getName().isEmpty()) score -= 0.3;
            if (customer.getEmail() == null || !isValidEmail(customer.getEmail())) score -= 0.4;
            return Math.max(0.0, score);
        }
        
        private boolean containsPII(CustomerRecord customer) {
            return customer.getEmail() != null || customer.getName() != null;
        }
        
        private boolean isValidEmail(String email) {
            return email.contains("@") && email.contains(".");
        }
    }
}

// Service for querying metadata
@Path("/metadata")
public class MetadataQueryService extends AbstractHttpServiceHandler {
    
    @GET
    @Path("/entity/{type}/{id}")
    public void getEntityMetadata(HttpServiceRequest request, HttpServiceResponder responder,
                                 @PathParam("type") String entityType,
                                 @PathParam("id") String entityId) {
        try {
            MetadataReader metadataReader = getContext().getMetadataReader();
            
            MetadataEntity entity = MetadataEntity.builder()
                .appendAsType(entityType)
                .append("id", entityId)
                .build();
                
            Map<MetadataScope, Metadata> allMetadata = metadataReader.getMetadata(entity);
            
            JsonObject response = new JsonObject();
            for (Map.Entry<MetadataScope, Metadata> entry : allMetadata.entrySet()) {
                JsonObject scopeMetadata = new JsonObject();
                
                // Add properties
                JsonObject properties = new JsonObject();
                for (Map.Entry<String, String> prop : entry.getValue().getProperties().entrySet()) {
                    properties.addProperty(prop.getKey(), prop.getValue());
                }
                scopeMetadata.add("properties", properties);
                
                // Add tags
                JsonArray tags = new JsonArray();
                for (String tag : entry.getValue().getTags()) {
                    tags.add(tag);
                }
                scopeMetadata.add("tags", tags);
                
                response.add(entry.getKey().name().toLowerCase(), scopeMetadata);
            }
            
            responder.sendJson(200, response);
            
        } catch (MetadataException e) {
            responder.sendError(500, "Metadata query failed: " + e.getMessage());
        } catch (Exception e) {
            responder.sendError(500, "Internal error: " + e.getMessage());
        }
    }
    
    @POST
    @Path("/entity/{type}/{id}/tags")
    public void addEntityTags(HttpServiceRequest request, HttpServiceResponder responder,
                             @PathParam("type") String entityType,
                             @PathParam("id") String entityId) {
        try {
            String content = Charset.forName("UTF-8").decode(
                ByteBuffer.wrap(request.getContent())).toString();
            JsonObject requestJson = new JsonParser().parse(content).getAsJsonObject();
            
            JsonArray tagsArray = requestJson.getAsJsonArray("tags");
            String[] tags = new String[tagsArray.size()];
            for (int i = 0; i < tagsArray.size(); i++) {
                tags[i] = tagsArray.get(i).getAsString();
            }
            
            MetadataWriter metadataWriter = getContext().getMetadataWriter();
            MetadataEntity entity = MetadataEntity.builder()
                .appendAsType(entityType)
                .append("id", entityId)
                .build();
                
            metadataWriter.addTags(entity, tags);
            
            responder.sendString(200, "Tags added successfully", "text/plain");
            
        } catch (Exception e) {
            responder.sendError(500, "Failed to add tags: " + e.getMessage());
        }
    }
}

Data Lineage

Data lineage tracking provides comprehensive visibility into data flow, transformations, and dependencies across the entire data processing pipeline.

Lineage Recording

import io.cdap.cdap.api.lineage.field.*;

// Lineage recorder interface
public interface LineageRecorder {
    
    // Record lineage operations
    void record(Collection<? extends Operation> operations);
    
    // Flush recorded lineage (automatic for batch programs, manual for streaming)
    void flushLineage() throws IllegalArgumentException;
}

// Base operation for lineage tracking
public abstract class Operation {
    public Operation(String name, String description, OperationType type, 
                    List<? extends EndPoint> inputs, List<? extends EndPoint> outputs) { 
        /* constructor */ 
    }
    
    public String getName() { /* returns operation name */ }
    public String getDescription() { /* returns operation description */ }
    public OperationType getType() { /* returns operation type */ }
    public List<EndPoint> getInputs() { /* returns input endpoints */ }
    public List<EndPoint> getOutputs() { /* returns output endpoints */ }
}

// Operation types
public enum OperationType {
    READ,       // Read operation from external source
    write,      // Write operation to external sink  
    transform   // Transformation operation
}

// Specific operation types
public class ReadOperation extends Operation {
    public ReadOperation(String name, String description, EndPoint source, 
                        String... fields) { /* constructor */ }
}

public class WriteOperation extends Operation {
    public WriteOperation(String name, String description, EndPoint sink,
                         InputField... inputs) { /* constructor */ }
}

public class TransformOperation extends Operation {
    public TransformOperation(String name, String description, 
                            List<InputField> inputs, String... outputs) { /* constructor */ }
}

// Lineage endpoints
public class EndPoint {
    public static EndPoint of(String namespace, String name) { /* create endpoint */ }
    public static EndPoint of(String namespace, String name, Map<String, String> properties) { /* create with props */ }
    
    public String getNamespace() { /* returns namespace */ }
    public String getName() { /* returns name */ }
    public Map<String, String> getProperties() { /* returns properties */ }
}

// Input field for lineage tracking
public class InputField {
    public static InputField of(String operationName, String fieldName) { /* create input field */ }
    
    public String getOrigin() { /* returns origin operation */ }
    public String getField() { /* returns field name */ }
}

Lineage Implementation Examples

// MapReduce program with comprehensive lineage tracking
public class CustomerAnalyticsMapReduce extends AbstractMapReduce {
    
    @Override
    public void initialize(MapReduceContext context) throws Exception {
        Job job = context.getHadoopJob();
        
        context.setInput(Input.ofDataset("customer_profiles"));
        context.addInput(Input.ofDataset("purchase_history"));
        context.addInput(Input.ofDataset("support_tickets"));
        
        context.setOutput(Output.ofDataset("customer_analytics"));
        
        job.setMapperClass(CustomerAnalyticsMapper.class);
        job.setReducerClass(CustomerAnalyticsReducer.class);
        
        // Record lineage for this MapReduce job
        recordJobLineage(context);
    }
    
    private void recordJobLineage(MapReduceContext context) {
        LineageRecorder lineageRecorder = context;
        
        // Define input endpoints
        EndPoint customerProfilesEP = EndPoint.of(context.getNamespace(), "customer_profiles");
        EndPoint purchaseHistoryEP = EndPoint.of(context.getNamespace(), "purchase_history");  
        EndPoint supportTicketsEP = EndPoint.of(context.getNamespace(), "support_tickets");
        
        // Define output endpoint
        EndPoint customerAnalyticsEP = EndPoint.of(context.getNamespace(), "customer_analytics");
        
        List<Operation> operations = new ArrayList<>();
        
        // Record read operations
        operations.add(new ReadOperation("read_customer_profiles", 
            "Read customer profile data", customerProfilesEP,
            "customer_id", "name", "email", "registration_date", "tier"));
            
        operations.add(new ReadOperation("read_purchase_history",
            "Read customer purchase history", purchaseHistoryEP,
            "customer_id", "purchase_date", "amount", "product_category"));
            
        operations.add(new ReadOperation("read_support_tickets", 
            "Read customer support interactions", supportTicketsEP,
            "customer_id", "ticket_date", "issue_type", "resolution_time"));
        
        // Record transformation operations
        operations.add(new TransformOperation("calculate_purchase_metrics",
            "Calculate purchase-related metrics per customer",
            Arrays.asList(
                InputField.of("read_purchase_history", "customer_id"),
                InputField.of("read_purchase_history", "amount"),
                InputField.of("read_purchase_history", "purchase_date")
            ),
            "total_spent", "avg_order_value", "purchase_frequency", "last_purchase_date"
        ));
        
        operations.add(new TransformOperation("calculate_support_metrics",
            "Calculate support-related metrics per customer", 
            Arrays.asList(
                InputField.of("read_support_tickets", "customer_id"),
                InputField.of("read_support_tickets", "ticket_date"),
                InputField.of("read_support_tickets", "resolution_time")
            ),
            "total_tickets", "avg_resolution_time", "last_ticket_date"
        ));
        
        operations.add(new TransformOperation("merge_customer_data",
            "Merge profile, purchase, and support data per customer",
            Arrays.asList(
                InputField.of("read_customer_profiles", "customer_id"),
                InputField.of("read_customer_profiles", "name"),
                InputField.of("read_customer_profiles", "email"),
                InputField.of("read_customer_profiles", "tier"),
                InputField.of("calculate_purchase_metrics", "total_spent"),
                InputField.of("calculate_purchase_metrics", "avg_order_value"),
                InputField.of("calculate_purchase_metrics", "purchase_frequency"),
                InputField.of("calculate_support_metrics", "total_tickets"),
                InputField.of("calculate_support_metrics", "avg_resolution_time")
            ),
            "customer_id", "name", "email", "tier", "total_spent", "avg_order_value",
            "purchase_frequency", "total_tickets", "avg_resolution_time", "customer_score"
        ));
        
        // Record write operation
        operations.add(new WriteOperation("write_customer_analytics",
            "Write aggregated customer analytics", customerAnalyticsEP,
            InputField.of("merge_customer_data", "customer_id"),
            InputField.of("merge_customer_data", "name"),
            InputField.of("merge_customer_data", "email"),
            InputField.of("merge_customer_data", "tier"),
            InputField.of("merge_customer_data", "total_spent"),
            InputField.of("merge_customer_data", "avg_order_value"),
            InputField.of("merge_customer_data", "purchase_frequency"),
            InputField.of("merge_customer_data", "total_tickets"),
            InputField.of("merge_customer_data", "avg_resolution_time"),
            InputField.of("merge_customer_data", "customer_score")
        ));
        
        // Record all operations for lineage
        lineageRecorder.record(operations);
    }
    
    // Mapper and Reducer implementations would process the actual data
    // while the lineage operations describe the data flow and transformations
}

// Spark program with streaming lineage tracking
public class RealTimeRecommendationSpark extends AbstractSpark {
    
    @Override
    public void run(SparkClientContext context) throws Exception {
        SparkSession spark = context.getSparkSession();
        LineageRecorder lineageRecorder = context;
        
        // Read streaming data
        Dataset<Row> userEvents = spark
            .readStream()
            .format("kafka")
            .option("kafka.bootstrap.servers", "localhost:9092")
            .option("subscribe", "user-events")
            .load();
            
        // Read static user profiles
        JavaPairRDD<byte[], Row> profilesRDD = context.fromDataset("user_profiles");
        Dataset<Row> userProfiles = spark.createDataFrame(profilesRDD.map(Tuple2::_2).rdd(), getProfileSchema());
        
        // Process and generate recommendations
        Dataset<Row> recommendations = userEvents
            .join(userProfiles, "user_id")
            .groupBy("user_id", "category")
            .agg(count("event").as("event_count"))
            .withColumn("recommendation_score", expr("event_count * 0.8"))
            .filter("recommendation_score > 5");
            
        // Record lineage for streaming operations
        recordStreamingLineage(lineageRecorder, context.getNamespace());
        
        // Write recommendations and flush lineage periodically
        StreamingQuery query = recommendations
            .writeStream()
            .outputMode("update")
            .foreachBatch((Dataset<Row> batchDF, Long batchId) -> {
                // Convert and save to dataset
                JavaPairRDD<byte[], Put> outputRDD = batchDF.javaRDD().mapToPair(row -> {
                    String key = row.getAs("user_id") + "_" + row.getAs("category");
                    Put put = new Put(Bytes.toBytes(key));
                    put.add("rec", "user_id", row.getAs("user_id"));
                    put.add("rec", "category", row.getAs("category"));
                    put.add("rec", "score", row.getAs("recommendation_score"));
                    put.add("rec", "batch_id", batchId);
                    return new Tuple2<>(Bytes.toBytes(key), put);
                });
                
                context.saveAsDataset(outputRDD, "user_recommendations");
                
                // Manually flush lineage for streaming program
                if (batchId % 10 == 0) { // Flush every 10 batches
                    lineageRecorder.flushLineage();
                }
            })
            .start();
            
        query.awaitTermination();
    }
    
    private void recordStreamingLineage(LineageRecorder recorder, String namespace) {
        List<Operation> operations = new ArrayList<>();
        
        // Input endpoints
        Map<String, String> kafkaProps = new HashMap<>();
        kafkaProps.put("topic", "user-events");
        kafkaProps.put("format", "json");
        EndPoint userEventsEP = EndPoint.of("kafka", "user-events", kafkaProps);
        EndPoint userProfilesEP = EndPoint.of(namespace, "user_profiles");
        EndPoint recommendationsEP = EndPoint.of(namespace, "user_recommendations");
        
        // Read operations
        operations.add(new ReadOperation("read_user_events",
            "Stream user events from Kafka", userEventsEP,
            "user_id", "event_type", "category", "timestamp"));
            
        operations.add(new ReadOperation("read_user_profiles", 
            "Read static user profile data", userProfilesEP,
            "user_id", "preferences", "demographics"));
        
        // Transform operations
        operations.add(new TransformOperation("join_events_profiles",
            "Join streaming events with user profiles",
            Arrays.asList(
                InputField.of("read_user_events", "user_id"),
                InputField.of("read_user_events", "event_type"),
                InputField.of("read_user_events", "category"),
                InputField.of("read_user_profiles", "user_id"),
                InputField.of("read_user_profiles", "preferences")
            ),
            "user_id", "event_type", "category", "preferences"
        ));
        
        operations.add(new TransformOperation("calculate_recommendations",
            "Calculate recommendation scores based on event patterns",
            Arrays.asList(
                InputField.of("join_events_profiles", "user_id"),
                InputField.of("join_events_profiles", "category"),
                InputField.of("join_events_profiles", "event_type")
            ),
            "user_id", "category", "recommendation_score"
        ));
        
        // Write operation
        operations.add(new WriteOperation("write_recommendations",
            "Write real-time recommendations", recommendationsEP,
            InputField.of("calculate_recommendations", "user_id"),
            InputField.of("calculate_recommendations", "category"),
            InputField.of("calculate_recommendations", "recommendation_score")
        ));
        
        recorder.record(operations);
    }
}

The Security & Metadata framework in CDAP enables comprehensive data governance with enterprise-grade security features, detailed metadata tracking, and complete data lineage visibility across all data processing operations.

Install with Tessl CLI

npx tessl i tessl/maven-io-cdap-cdap--cdap

docs

application-framework.md

data-management.md

data-processing.md

index.md

operational.md

plugin-system.md

security-metadata.md

tile.json