0
# Security & Metadata
1
2
CDAP provides comprehensive security and metadata management capabilities for enterprise data governance. These features enable secure storage of credentials, comprehensive metadata tracking, data lineage recording, and access control across all application components.
3
4
## Security Framework
5
6
### Secure Store
7
8
The Secure Store provides encrypted, centralized storage for sensitive configuration data such as passwords, API keys, and certificates.
9
10
```java { .api }
11
import io.cdap.cdap.api.security.store.*;
12
13
// Secure store interface for read access
14
@Beta
15
public interface SecureStore {
16
17
// List stored secrets
18
List<SecureStoreMetadata> list(String namespace) throws Exception;
19
20
// Retrieve secret data and metadata
21
SecureStoreData get(String namespace, String name) throws Exception;
22
23
// Retrieve only metadata (default implementation)
24
default SecureStoreMetadata getMetadata(String namespace, String name) throws Exception {
25
return get(namespace, name);
26
}
27
28
// Retrieve only the secret data
29
default byte[] getData(String namespace, String name) throws Exception {
30
return get(namespace, name).get();
31
}
32
}
33
34
// Secure store management interface
35
@Beta
36
public interface SecureStoreManager extends SecureStore {
37
38
// Store a secret
39
void put(String namespace, String name, String data, String description,
40
Map<String, String> properties) throws Exception;
41
42
// Delete a secret
43
void delete(String namespace, String name) throws Exception;
44
}
45
46
// Secure store data container
47
public class SecureStoreData {
48
public byte[] get() { /* returns secret data */ }
49
public String getName() { /* returns secret name */ }
50
public String getDescription() { /* returns description */ }
51
public long getCreationTimeMs() { /* returns creation timestamp */ }
52
public Map<String, String> getProperties() { /* returns properties */ }
53
}
54
55
// Secure store metadata
56
public class SecureStoreMetadata {
57
public String getName() { /* returns secret name */ }
58
public String getDescription() { /* returns description */ }
59
public long getCreationTimeMs() { /* returns creation timestamp */ }
60
public long getLastModifiedTimeMs() { /* returns last modified timestamp */ }
61
public Map<String, String> getProperties() { /* returns properties */ }
62
}
63
```
64
65
### Secure Store Usage Examples
66
67
```java { .api }
68
// Using secure store in applications
69
public class DatabaseConnectorApp extends AbstractApplication {
70
71
@Override
72
public void configure(ApplicationConfigurer configurer, ApplicationContext context) {
73
configurer.setName("DatabaseConnector");
74
configurer.setDescription("Connects to external databases securely");
75
76
// Add a MapReduce program that uses secure credentials
77
configurer.addMapReduce(new SecureDataExtractionMapReduce());
78
79
// Add a service that uses API keys from secure store
80
configurer.addService(new SecureAPIService());
81
}
82
}
83
84
// MapReduce program using secure store
85
public class SecureDataExtractionMapReduce extends AbstractMapReduce {
86
87
@Override
88
public void initialize(MapReduceContext context) throws Exception {
89
Job job = context.getHadoopJob();
90
91
// Retrieve database credentials from secure store
92
SecureStore secureStore = context.getAdmin();
93
SecureStoreData dbPassword = secureStore.get(context.getNamespace(), "db-password");
94
SecureStoreData dbUsername = secureStore.get(context.getNamespace(), "db-username");
95
SecureStoreData dbUrl = secureStore.get(context.getNamespace(), "db-connection-url");
96
97
// Configure database connection securely
98
Configuration conf = job.getConfiguration();
99
conf.set("db.url", new String(dbUrl.get()));
100
conf.set("db.username", new String(dbUsername.get()));
101
conf.set("db.password", new String(dbPassword.get()));
102
103
// Set input/output configuration
104
context.setInput(Input.ofDataset("source_data"));
105
context.setOutput(Output.ofDataset("extracted_data"));
106
107
job.setMapperClass(SecureDataMapper.class);
108
}
109
110
public static class SecureDataMapper extends Mapper<byte[], Row, byte[], Put> {
111
private Connection dbConnection;
112
113
@Override
114
protected void setup(Context context) throws IOException, InterruptedException {
115
Configuration conf = context.getConfiguration();
116
117
// Initialize secure database connection
118
try {
119
String url = conf.get("db.url");
120
String username = conf.get("db.username");
121
String password = conf.get("db.password");
122
123
dbConnection = DriverManager.getConnection(url, username, password);
124
} catch (SQLException e) {
125
throw new IOException("Failed to connect to database", e);
126
}
127
}
128
129
@Override
130
protected void map(byte[] key, Row row, Context context)
131
throws IOException, InterruptedException {
132
133
try {
134
// Perform secure database operations
135
String query = "SELECT enrichment_data FROM lookup_table WHERE id = ?";
136
try (PreparedStatement stmt = dbConnection.prepareStatement(query)) {
137
stmt.setString(1, row.getString("id"));
138
139
try (ResultSet rs = stmt.executeQuery()) {
140
if (rs.next()) {
141
Put put = new Put(key);
142
put.add("original", row.getColumns());
143
put.add("enriched_data", rs.getString("enrichment_data"));
144
put.add("enrichment_timestamp", System.currentTimeMillis());
145
146
context.write(key, put);
147
}
148
}
149
}
150
} catch (SQLException e) {
151
throw new IOException("Database operation failed", e);
152
}
153
}
154
155
@Override
156
protected void cleanup(Context context) throws IOException, InterruptedException {
157
if (dbConnection != null) {
158
try {
159
dbConnection.close();
160
} catch (SQLException e) {
161
// Log error but don't fail the job
162
LOG.warn("Failed to close database connection", e);
163
}
164
}
165
}
166
}
167
}
168
169
// Service using API keys from secure store
170
@Path("/api")
171
public class SecureAPIService extends AbstractHttpServiceHandler {
172
173
private String apiKey;
174
private String apiSecret;
175
176
@Override
177
public void initialize(HttpServiceContext context) throws Exception {
178
super.initialize(context);
179
180
// Retrieve API credentials from secure store
181
SecureStore secureStore = context.getAdmin();
182
apiKey = new String(secureStore.getData(context.getNamespace(), "external-api-key"));
183
apiSecret = new String(secureStore.getData(context.getNamespace(), "external-api-secret"));
184
}
185
186
@GET
187
@Path("/external-data/{id}")
188
public void fetchExternalData(HttpServiceRequest request, HttpServiceResponder responder,
189
@PathParam("id") String id) {
190
try {
191
// Make authenticated API call using secure credentials
192
String externalData = callExternalAPI(id, apiKey, apiSecret);
193
responder.sendJson(200, externalData);
194
} catch (Exception e) {
195
LOG.error("Failed to fetch external data for ID: {}", id, e);
196
responder.sendError(500, "Failed to fetch external data");
197
}
198
}
199
200
private String callExternalAPI(String id, String key, String secret) throws IOException {
201
// Implementation for secure external API calls
202
// Use HTTPS, proper authentication headers, etc.
203
return "{}"; // Placeholder
204
}
205
}
206
207
// Administrative operations for secure store management
208
public class SecureStoreManagementAction extends AbstractCustomAction {
209
210
@Override
211
public void run(CustomActionContext context) throws Exception {
212
SecureStoreManager storeManager = context.getAdmin();
213
String namespace = context.getNamespace();
214
215
// Setup database credentials during deployment
216
Map<String, String> dbProperties = new HashMap<>();
217
dbProperties.put("environment", "production");
218
dbProperties.put("database_type", "postgresql");
219
220
// Store encrypted credentials (would typically come from deployment config)
221
storeManager.put(namespace, "db-username", "prod_db_user",
222
"Production database username", dbProperties);
223
storeManager.put(namespace, "db-password", "secure_password_123",
224
"Production database password", dbProperties);
225
storeManager.put(namespace, "db-connection-url",
226
"jdbc:postgresql://prod-db:5432/main",
227
"Production database URL", dbProperties);
228
229
// Setup API credentials
230
Map<String, String> apiProperties = new HashMap<>();
231
apiProperties.put("service", "external-analytics");
232
apiProperties.put("access_level", "read");
233
234
storeManager.put(namespace, "external-api-key", "api_key_xyz789",
235
"External analytics API key", apiProperties);
236
storeManager.put(namespace, "external-api-secret", "secret_abc456",
237
"External analytics API secret", apiProperties);
238
239
context.getMetrics().count("secure_store.secrets_configured", 5);
240
}
241
}
242
```
243
244
### Access Control
245
246
```java { .api }
247
import io.cdap.cdap.api.security.AccessException;
248
249
// Access exception for security violations
250
public class AccessException extends Exception {
251
public AccessException(String message) { super(message); }
252
public AccessException(String message, Throwable cause) { super(message, cause); }
253
}
254
255
// Security context and access patterns
256
public class SecurityAwareService extends AbstractHttpServiceHandler {
257
258
@GET
259
@Path("/secure-data")
260
public void getSecureData(HttpServiceRequest request, HttpServiceResponder responder) {
261
try {
262
// Validate user permissions
263
validateUserAccess(request);
264
265
// Access secure data
266
String userId = request.getHeader("X-User-ID");
267
Table userDataTable = getContext().getDataset("user_secure_data");
268
269
Row userData = userDataTable.get(Bytes.toBytes(userId));
270
if (userData.isEmpty()) {
271
responder.sendError(404, "User data not found");
272
return;
273
}
274
275
// Filter sensitive fields based on user role
276
JsonObject response = filterSensitiveData(userData, getUserRole(userId));
277
responder.sendJson(200, response);
278
279
} catch (AccessException e) {
280
responder.sendError(403, "Access denied: " + e.getMessage());
281
} catch (Exception e) {
282
LOG.error("Error accessing secure data", e);
283
responder.sendError(500, "Internal error");
284
}
285
}
286
287
private void validateUserAccess(HttpServiceRequest request) throws AccessException {
288
String authToken = request.getHeader("Authorization");
289
if (authToken == null || !isValidToken(authToken)) {
290
throw new AccessException("Invalid or missing authentication token");
291
}
292
293
String userRole = getUserRoleFromToken(authToken);
294
if (!hasDataAccessPermission(userRole)) {
295
throw new AccessException("Insufficient permissions for data access");
296
}
297
}
298
299
private JsonObject filterSensitiveData(Row userData, String userRole) {
300
JsonObject filtered = new JsonObject();
301
302
// Always include basic info
303
filtered.addProperty("id", userData.getString("id"));
304
filtered.addProperty("name", userData.getString("name"));
305
306
// Include sensitive data only for privileged roles
307
if ("admin".equals(userRole) || "data_analyst".equals(userRole)) {
308
filtered.addProperty("ssn", userData.getString("ssn"));
309
filtered.addProperty("salary", userData.getLong("salary"));
310
}
311
312
// Include PII only for admin role
313
if ("admin".equals(userRole)) {
314
filtered.addProperty("address", userData.getString("address"));
315
filtered.addProperty("phone", userData.getString("phone"));
316
}
317
318
return filtered;
319
}
320
321
private boolean isValidToken(String token) {
322
// Implementation for token validation
323
return token.startsWith("Bearer ") && token.length() > 50;
324
}
325
326
private String getUserRoleFromToken(String token) {
327
// Implementation for extracting user role from token
328
return "user"; // Default role
329
}
330
331
private String getUserRole(String userId) {
332
// Implementation for getting user role from user ID
333
return "user"; // Default role
334
}
335
336
private boolean hasDataAccessPermission(String role) {
337
return Arrays.asList("admin", "data_analyst", "user").contains(role);
338
}
339
}
340
```
341
342
## Metadata Management
343
344
### Metadata Framework
345
346
CDAP provides comprehensive metadata management for tracking data schema, properties, tags, and relationships across all application components.
347
348
```java { .api }
349
import io.cdap.cdap.api.metadata.*;
350
351
// Metadata reader interface
352
@Beta
353
public interface MetadataReader {
354
355
// Get all metadata for an entity
356
Map<MetadataScope, Metadata> getMetadata(MetadataEntity metadataEntity) throws MetadataException;
357
358
// Get metadata for specific scope
359
Metadata getMetadata(MetadataScope scope, MetadataEntity metadataEntity) throws MetadataException;
360
}
361
362
// Metadata writer interface
363
@Beta
364
public interface MetadataWriter {
365
366
// Add properties to an entity
367
void addProperties(MetadataEntity metadataEntity, Map<String, String> properties);
368
369
// Add tags to an entity
370
void addTags(MetadataEntity metadataEntity, String... tags);
371
void addTags(MetadataEntity metadataEntity, Iterable<String> tags);
372
373
// Remove properties from an entity
374
void removeProperties(MetadataEntity metadataEntity, String... keys);
375
376
// Remove tags from an entity
377
void removeTags(MetadataEntity metadataEntity, String... tags);
378
379
// Remove all metadata for an entity
380
void removeMetadata(MetadataEntity metadataEntity);
381
}
382
383
// Metadata container
384
public class Metadata {
385
public static final Metadata EMPTY = new Metadata(Collections.emptyMap(), Collections.emptySet());
386
387
public Metadata(Map<String, String> properties, Set<String> tags) { /* constructor */ }
388
389
public Map<String, String> getProperties() { /* returns properties */ }
390
public Set<String> getTags() { /* returns tags */ }
391
public boolean isEmpty() { /* returns if metadata is empty */ }
392
}
393
394
// Metadata entity identification
395
public class MetadataEntity {
396
public static Builder builder() { return new Builder(); }
397
398
public String getType() { /* returns entity type */ }
399
public List<String> getValue() { /* returns entity value components */ }
400
401
public static class Builder {
402
public Builder append(String type, String value) { /* append component */ return this; }
403
public Builder appendAsType(String value) { /* append as type */ return this; }
404
public MetadataEntity build() { /* build entity */ }
405
}
406
}
407
408
// Metadata scopes
409
public enum MetadataScope {
410
USER, // User-defined metadata
411
SYSTEM // System-generated metadata
412
}
413
414
// Metadata exception
415
public class MetadataException extends Exception {
416
public MetadataException(String message) { super(message); }
417
public MetadataException(String message, Throwable cause) { super(message, cause); }
418
}
419
```
420
421
### Metadata Usage Examples
422
423
```java { .api }
424
// Metadata management in applications
425
public class DataGovernanceApp extends AbstractApplication {
426
427
@Override
428
public void configure(ApplicationConfigurer configurer, ApplicationContext context) {
429
configurer.setName("DataGovernanceApp");
430
configurer.setDescription("Manages data lineage and metadata");
431
432
// Add program for metadata enrichment
433
configurer.addMapReduce(new MetadataEnrichmentMapReduce());
434
435
// Add service for metadata queries
436
configurer.addService(new MetadataQueryService());
437
438
// Add workflow for periodic metadata updates
439
configurer.addWorkflow(new MetadataMaintenanceWorkflow());
440
}
441
}
442
443
// MapReduce program that enriches data with metadata
444
public class MetadataEnrichmentMapReduce extends AbstractMapReduce {
445
446
@Override
447
public void initialize(MapReduceContext context) throws Exception {
448
Job job = context.getHadoopJob();
449
450
context.setInput(Input.ofDataset("raw_customer_data"));
451
context.setOutput(Output.ofDataset("enriched_customer_data"));
452
453
job.setMapperClass(MetadataEnrichmentMapper.class);
454
job.setReducerClass(MetadataAggregationReducer.class);
455
456
// Add metadata about this processing job
457
MetadataWriter metadataWriter = context.getMetadataWriter();
458
MetadataEntity jobEntity = MetadataEntity.builder()
459
.appendAsType("program")
460
.append("application", context.getApplicationSpecification().getName())
461
.append("program", context.getSpecification().getName())
462
.build();
463
464
// Tag the processing job
465
metadataWriter.addTags(jobEntity, "customer-processing", "batch-enrichment", "daily");
466
467
// Add properties
468
Map<String, String> jobProperties = new HashMap<>();
469
jobProperties.put("data_source", "customer_database");
470
jobProperties.put("processing_type", "enrichment");
471
jobProperties.put("schedule", "daily");
472
jobProperties.put("owner", "data-team");
473
metadataWriter.addProperties(jobEntity, jobProperties);
474
}
475
476
public static class MetadataEnrichmentMapper extends Mapper<byte[], Row, Text, CustomerRecord> {
477
478
private MetadataWriter metadataWriter;
479
480
@Override
481
protected void setup(Context context) throws IOException, InterruptedException {
482
// Access metadata writer from context
483
metadataWriter = ((MapReduceTaskContext<?>) context).getMetadataWriter();
484
}
485
486
@Override
487
protected void map(byte[] key, Row row, Context context)
488
throws IOException, InterruptedException {
489
490
String customerId = row.getString("customer_id");
491
CustomerRecord customer = new CustomerRecord();
492
customer.setId(customerId);
493
customer.setName(row.getString("name"));
494
customer.setEmail(row.getString("email"));
495
customer.setRegistrationDate(row.getLong("registration_date"));
496
497
// Enrich with metadata
498
enrichCustomerWithMetadata(customer);
499
500
// Track data lineage for the customer record
501
trackCustomerDataLineage(customerId, customer);
502
503
context.write(new Text(customerId), customer);
504
}
505
506
private void enrichCustomerWithMetadata(CustomerRecord customer) {
507
// Add computed metadata
508
long daysSinceRegistration =
509
(System.currentTimeMillis() - customer.getRegistrationDate()) / (24 * 60 * 60 * 1000);
510
customer.setDaysSinceRegistration(daysSinceRegistration);
511
512
// Classify customer based on data
513
String customerTier = classifyCustomerTier(customer);
514
customer.setTier(customerTier);
515
516
// Add data quality score
517
double qualityScore = calculateDataQuality(customer);
518
customer.setDataQualityScore(qualityScore);
519
}
520
521
private void trackCustomerDataLineage(String customerId, CustomerRecord customer) {
522
// Create metadata entity for this customer record
523
MetadataEntity customerEntity = MetadataEntity.builder()
524
.appendAsType("customer")
525
.append("id", customerId)
526
.build();
527
528
// Add metadata tags
529
metadataWriter.addTags(customerEntity, "customer-data", "pii", customer.getTier());
530
531
// Add metadata properties
532
Map<String, String> properties = new HashMap<>();
533
properties.put("data_classification", containsPII(customer) ? "sensitive" : "public");
534
properties.put("last_processed", String.valueOf(System.currentTimeMillis()));
535
properties.put("processing_job", "MetadataEnrichmentMapReduce");
536
properties.put("data_quality_score", String.valueOf(customer.getDataQualityScore()));
537
properties.put("customer_tier", customer.getTier());
538
539
metadataWriter.addProperties(customerEntity, properties);
540
}
541
542
private String classifyCustomerTier(CustomerRecord customer) {
543
// Logic to classify customer tier
544
return customer.getDaysSinceRegistration() > 365 ? "gold" : "silver";
545
}
546
547
private double calculateDataQuality(CustomerRecord customer) {
548
// Logic to calculate data quality score
549
double score = 1.0;
550
if (customer.getName() == null || customer.getName().isEmpty()) score -= 0.3;
551
if (customer.getEmail() == null || !isValidEmail(customer.getEmail())) score -= 0.4;
552
return Math.max(0.0, score);
553
}
554
555
private boolean containsPII(CustomerRecord customer) {
556
return customer.getEmail() != null || customer.getName() != null;
557
}
558
559
private boolean isValidEmail(String email) {
560
return email.contains("@") && email.contains(".");
561
}
562
}
563
}
564
565
// Service for querying metadata
566
@Path("/metadata")
567
public class MetadataQueryService extends AbstractHttpServiceHandler {
568
569
@GET
570
@Path("/entity/{type}/{id}")
571
public void getEntityMetadata(HttpServiceRequest request, HttpServiceResponder responder,
572
@PathParam("type") String entityType,
573
@PathParam("id") String entityId) {
574
try {
575
MetadataReader metadataReader = getContext().getMetadataReader();
576
577
MetadataEntity entity = MetadataEntity.builder()
578
.appendAsType(entityType)
579
.append("id", entityId)
580
.build();
581
582
Map<MetadataScope, Metadata> allMetadata = metadataReader.getMetadata(entity);
583
584
JsonObject response = new JsonObject();
585
for (Map.Entry<MetadataScope, Metadata> entry : allMetadata.entrySet()) {
586
JsonObject scopeMetadata = new JsonObject();
587
588
// Add properties
589
JsonObject properties = new JsonObject();
590
for (Map.Entry<String, String> prop : entry.getValue().getProperties().entrySet()) {
591
properties.addProperty(prop.getKey(), prop.getValue());
592
}
593
scopeMetadata.add("properties", properties);
594
595
// Add tags
596
JsonArray tags = new JsonArray();
597
for (String tag : entry.getValue().getTags()) {
598
tags.add(tag);
599
}
600
scopeMetadata.add("tags", tags);
601
602
response.add(entry.getKey().name().toLowerCase(), scopeMetadata);
603
}
604
605
responder.sendJson(200, response);
606
607
} catch (MetadataException e) {
608
responder.sendError(500, "Metadata query failed: " + e.getMessage());
609
} catch (Exception e) {
610
responder.sendError(500, "Internal error: " + e.getMessage());
611
}
612
}
613
614
@POST
615
@Path("/entity/{type}/{id}/tags")
616
public void addEntityTags(HttpServiceRequest request, HttpServiceResponder responder,
617
@PathParam("type") String entityType,
618
@PathParam("id") String entityId) {
619
try {
620
String content = Charset.forName("UTF-8").decode(
621
ByteBuffer.wrap(request.getContent())).toString();
622
JsonObject requestJson = new JsonParser().parse(content).getAsJsonObject();
623
624
JsonArray tagsArray = requestJson.getAsJsonArray("tags");
625
String[] tags = new String[tagsArray.size()];
626
for (int i = 0; i < tagsArray.size(); i++) {
627
tags[i] = tagsArray.get(i).getAsString();
628
}
629
630
MetadataWriter metadataWriter = getContext().getMetadataWriter();
631
MetadataEntity entity = MetadataEntity.builder()
632
.appendAsType(entityType)
633
.append("id", entityId)
634
.build();
635
636
metadataWriter.addTags(entity, tags);
637
638
responder.sendString(200, "Tags added successfully", "text/plain");
639
640
} catch (Exception e) {
641
responder.sendError(500, "Failed to add tags: " + e.getMessage());
642
}
643
}
644
}
645
```
646
647
## Data Lineage
648
649
Data lineage tracking provides comprehensive visibility into data flow, transformations, and dependencies across the entire data processing pipeline.
650
651
### Lineage Recording
652
653
```java { .api }
654
import io.cdap.cdap.api.lineage.field.*;
655
656
// Lineage recorder interface
657
public interface LineageRecorder {
658
659
// Record lineage operations
660
void record(Collection<? extends Operation> operations);
661
662
// Flush recorded lineage (automatic for batch programs, manual for streaming)
663
void flushLineage() throws IllegalArgumentException;
664
}
665
666
// Base operation for lineage tracking
667
public abstract class Operation {
668
public Operation(String name, String description, OperationType type,
669
List<? extends EndPoint> inputs, List<? extends EndPoint> outputs) {
670
/* constructor */
671
}
672
673
public String getName() { /* returns operation name */ }
674
public String getDescription() { /* returns operation description */ }
675
public OperationType getType() { /* returns operation type */ }
676
public List<EndPoint> getInputs() { /* returns input endpoints */ }
677
public List<EndPoint> getOutputs() { /* returns output endpoints */ }
678
}
679
680
// Operation types
681
public enum OperationType {
682
READ, // Read operation from external source
683
write, // Write operation to external sink
684
transform // Transformation operation
685
}
686
687
// Specific operation types
688
public class ReadOperation extends Operation {
689
public ReadOperation(String name, String description, EndPoint source,
690
String... fields) { /* constructor */ }
691
}
692
693
public class WriteOperation extends Operation {
694
public WriteOperation(String name, String description, EndPoint sink,
695
InputField... inputs) { /* constructor */ }
696
}
697
698
public class TransformOperation extends Operation {
699
public TransformOperation(String name, String description,
700
List<InputField> inputs, String... outputs) { /* constructor */ }
701
}
702
703
// Lineage endpoints
704
public class EndPoint {
705
public static EndPoint of(String namespace, String name) { /* create endpoint */ }
706
public static EndPoint of(String namespace, String name, Map<String, String> properties) { /* create with props */ }
707
708
public String getNamespace() { /* returns namespace */ }
709
public String getName() { /* returns name */ }
710
public Map<String, String> getProperties() { /* returns properties */ }
711
}
712
713
// Input field for lineage tracking
714
public class InputField {
715
public static InputField of(String operationName, String fieldName) { /* create input field */ }
716
717
public String getOrigin() { /* returns origin operation */ }
718
public String getField() { /* returns field name */ }
719
}
720
```
721
722
### Lineage Implementation Examples
723
724
```java { .api }
725
// MapReduce program with comprehensive lineage tracking
726
public class CustomerAnalyticsMapReduce extends AbstractMapReduce {
727
728
@Override
729
public void initialize(MapReduceContext context) throws Exception {
730
Job job = context.getHadoopJob();
731
732
context.setInput(Input.ofDataset("customer_profiles"));
733
context.addInput(Input.ofDataset("purchase_history"));
734
context.addInput(Input.ofDataset("support_tickets"));
735
736
context.setOutput(Output.ofDataset("customer_analytics"));
737
738
job.setMapperClass(CustomerAnalyticsMapper.class);
739
job.setReducerClass(CustomerAnalyticsReducer.class);
740
741
// Record lineage for this MapReduce job
742
recordJobLineage(context);
743
}
744
745
private void recordJobLineage(MapReduceContext context) {
746
LineageRecorder lineageRecorder = context;
747
748
// Define input endpoints
749
EndPoint customerProfilesEP = EndPoint.of(context.getNamespace(), "customer_profiles");
750
EndPoint purchaseHistoryEP = EndPoint.of(context.getNamespace(), "purchase_history");
751
EndPoint supportTicketsEP = EndPoint.of(context.getNamespace(), "support_tickets");
752
753
// Define output endpoint
754
EndPoint customerAnalyticsEP = EndPoint.of(context.getNamespace(), "customer_analytics");
755
756
List<Operation> operations = new ArrayList<>();
757
758
// Record read operations
759
operations.add(new ReadOperation("read_customer_profiles",
760
"Read customer profile data", customerProfilesEP,
761
"customer_id", "name", "email", "registration_date", "tier"));
762
763
operations.add(new ReadOperation("read_purchase_history",
764
"Read customer purchase history", purchaseHistoryEP,
765
"customer_id", "purchase_date", "amount", "product_category"));
766
767
operations.add(new ReadOperation("read_support_tickets",
768
"Read customer support interactions", supportTicketsEP,
769
"customer_id", "ticket_date", "issue_type", "resolution_time"));
770
771
// Record transformation operations
772
operations.add(new TransformOperation("calculate_purchase_metrics",
773
"Calculate purchase-related metrics per customer",
774
Arrays.asList(
775
InputField.of("read_purchase_history", "customer_id"),
776
InputField.of("read_purchase_history", "amount"),
777
InputField.of("read_purchase_history", "purchase_date")
778
),
779
"total_spent", "avg_order_value", "purchase_frequency", "last_purchase_date"
780
));
781
782
operations.add(new TransformOperation("calculate_support_metrics",
783
"Calculate support-related metrics per customer",
784
Arrays.asList(
785
InputField.of("read_support_tickets", "customer_id"),
786
InputField.of("read_support_tickets", "ticket_date"),
787
InputField.of("read_support_tickets", "resolution_time")
788
),
789
"total_tickets", "avg_resolution_time", "last_ticket_date"
790
));
791
792
operations.add(new TransformOperation("merge_customer_data",
793
"Merge profile, purchase, and support data per customer",
794
Arrays.asList(
795
InputField.of("read_customer_profiles", "customer_id"),
796
InputField.of("read_customer_profiles", "name"),
797
InputField.of("read_customer_profiles", "email"),
798
InputField.of("read_customer_profiles", "tier"),
799
InputField.of("calculate_purchase_metrics", "total_spent"),
800
InputField.of("calculate_purchase_metrics", "avg_order_value"),
801
InputField.of("calculate_purchase_metrics", "purchase_frequency"),
802
InputField.of("calculate_support_metrics", "total_tickets"),
803
InputField.of("calculate_support_metrics", "avg_resolution_time")
804
),
805
"customer_id", "name", "email", "tier", "total_spent", "avg_order_value",
806
"purchase_frequency", "total_tickets", "avg_resolution_time", "customer_score"
807
));
808
809
// Record write operation
810
operations.add(new WriteOperation("write_customer_analytics",
811
"Write aggregated customer analytics", customerAnalyticsEP,
812
InputField.of("merge_customer_data", "customer_id"),
813
InputField.of("merge_customer_data", "name"),
814
InputField.of("merge_customer_data", "email"),
815
InputField.of("merge_customer_data", "tier"),
816
InputField.of("merge_customer_data", "total_spent"),
817
InputField.of("merge_customer_data", "avg_order_value"),
818
InputField.of("merge_customer_data", "purchase_frequency"),
819
InputField.of("merge_customer_data", "total_tickets"),
820
InputField.of("merge_customer_data", "avg_resolution_time"),
821
InputField.of("merge_customer_data", "customer_score")
822
));
823
824
// Record all operations for lineage
825
lineageRecorder.record(operations);
826
}
827
828
// Mapper and Reducer implementations would process the actual data
829
// while the lineage operations describe the data flow and transformations
830
}
831
832
// Spark program with streaming lineage tracking
833
public class RealTimeRecommendationSpark extends AbstractSpark {
834
835
@Override
836
public void run(SparkClientContext context) throws Exception {
837
SparkSession spark = context.getSparkSession();
838
LineageRecorder lineageRecorder = context;
839
840
// Read streaming data
841
Dataset<Row> userEvents = spark
842
.readStream()
843
.format("kafka")
844
.option("kafka.bootstrap.servers", "localhost:9092")
845
.option("subscribe", "user-events")
846
.load();
847
848
// Read static user profiles
849
JavaPairRDD<byte[], Row> profilesRDD = context.fromDataset("user_profiles");
850
Dataset<Row> userProfiles = spark.createDataFrame(profilesRDD.map(Tuple2::_2).rdd(), getProfileSchema());
851
852
// Process and generate recommendations
853
Dataset<Row> recommendations = userEvents
854
.join(userProfiles, "user_id")
855
.groupBy("user_id", "category")
856
.agg(count("event").as("event_count"))
857
.withColumn("recommendation_score", expr("event_count * 0.8"))
858
.filter("recommendation_score > 5");
859
860
// Record lineage for streaming operations
861
recordStreamingLineage(lineageRecorder, context.getNamespace());
862
863
// Write recommendations and flush lineage periodically
864
StreamingQuery query = recommendations
865
.writeStream()
866
.outputMode("update")
867
.foreachBatch((Dataset<Row> batchDF, Long batchId) -> {
868
// Convert and save to dataset
869
JavaPairRDD<byte[], Put> outputRDD = batchDF.javaRDD().mapToPair(row -> {
870
String key = row.getAs("user_id") + "_" + row.getAs("category");
871
Put put = new Put(Bytes.toBytes(key));
872
put.add("rec", "user_id", row.getAs("user_id"));
873
put.add("rec", "category", row.getAs("category"));
874
put.add("rec", "score", row.getAs("recommendation_score"));
875
put.add("rec", "batch_id", batchId);
876
return new Tuple2<>(Bytes.toBytes(key), put);
877
});
878
879
context.saveAsDataset(outputRDD, "user_recommendations");
880
881
// Manually flush lineage for streaming program
882
if (batchId % 10 == 0) { // Flush every 10 batches
883
lineageRecorder.flushLineage();
884
}
885
})
886
.start();
887
888
query.awaitTermination();
889
}
890
891
private void recordStreamingLineage(LineageRecorder recorder, String namespace) {
892
List<Operation> operations = new ArrayList<>();
893
894
// Input endpoints
895
Map<String, String> kafkaProps = new HashMap<>();
896
kafkaProps.put("topic", "user-events");
897
kafkaProps.put("format", "json");
898
EndPoint userEventsEP = EndPoint.of("kafka", "user-events", kafkaProps);
899
EndPoint userProfilesEP = EndPoint.of(namespace, "user_profiles");
900
EndPoint recommendationsEP = EndPoint.of(namespace, "user_recommendations");
901
902
// Read operations
903
operations.add(new ReadOperation("read_user_events",
904
"Stream user events from Kafka", userEventsEP,
905
"user_id", "event_type", "category", "timestamp"));
906
907
operations.add(new ReadOperation("read_user_profiles",
908
"Read static user profile data", userProfilesEP,
909
"user_id", "preferences", "demographics"));
910
911
// Transform operations
912
operations.add(new TransformOperation("join_events_profiles",
913
"Join streaming events with user profiles",
914
Arrays.asList(
915
InputField.of("read_user_events", "user_id"),
916
InputField.of("read_user_events", "event_type"),
917
InputField.of("read_user_events", "category"),
918
InputField.of("read_user_profiles", "user_id"),
919
InputField.of("read_user_profiles", "preferences")
920
),
921
"user_id", "event_type", "category", "preferences"
922
));
923
924
operations.add(new TransformOperation("calculate_recommendations",
925
"Calculate recommendation scores based on event patterns",
926
Arrays.asList(
927
InputField.of("join_events_profiles", "user_id"),
928
InputField.of("join_events_profiles", "category"),
929
InputField.of("join_events_profiles", "event_type")
930
),
931
"user_id", "category", "recommendation_score"
932
));
933
934
// Write operation
935
operations.add(new WriteOperation("write_recommendations",
936
"Write real-time recommendations", recommendationsEP,
937
InputField.of("calculate_recommendations", "user_id"),
938
InputField.of("calculate_recommendations", "category"),
939
InputField.of("calculate_recommendations", "recommendation_score")
940
));
941
942
recorder.record(operations);
943
}
944
}
945
```
946
947
The Security & Metadata framework in CDAP enables comprehensive data governance with enterprise-grade security features, detailed metadata tracking, and complete data lineage visibility across all data processing operations.