0
# Data Operations and Utilities
1
2
Additional data operations including lineage tracking, preferences management, dataset modules, workflow management, and system utilities that complement the core CDAP functionality.
3
4
## LineageClient
5
6
```java { .api }
7
public class LineageClient {
8
// Constructors
9
public LineageClient(ClientConfig config);
10
public LineageClient(ClientConfig config, RESTClient restClient);
11
12
// Data lineage operations
13
public Lineage getLineage(DatasetId dataset, long startTime, long endTime, int levels);
14
public FieldLineage getFieldLineage(DatasetId dataset, String field, long startTime, long endTime);
15
public Set<DatasetId> getDatasetDependencies(DatasetId dataset, long startTime, long endTime);
16
public Set<ProgramId> getDatasetConsumers(DatasetId dataset, long startTime, long endTime);
17
public Set<ProgramId> getDatasetProducers(DatasetId dataset, long startTime, long endTime);
18
}
19
```
20
21
## PreferencesClient
22
23
```java { .api }
24
public class PreferencesClient {
25
// Constructors
26
public PreferencesClient(ClientConfig config);
27
public PreferencesClient(ClientConfig config, RESTClient restClient);
28
29
// Namespace preferences
30
public void setPreferences(NamespaceId namespace, Map<String, String> preferences);
31
public Map<String, String> getPreferences(NamespaceId namespace);
32
public void deletePreferences(NamespaceId namespace);
33
34
// Application preferences
35
public void setApplicationPreferences(ApplicationId application, Map<String, String> preferences);
36
public Map<String, String> getApplicationPreferences(ApplicationId application);
37
public void deleteApplicationPreferences(ApplicationId application);
38
39
// Program preferences
40
public void setProgramPreferences(ProgramId program, Map<String, String> preferences);
41
public Map<String, String> getProgramPreferences(ProgramId program);
42
public void deleteProgramPreferences(ProgramId program);
43
44
// Resolved preferences (with inheritance)
45
public Map<String, String> getResolvedPreferences(ProgramId program);
46
}
47
```
48
49
## Dataset Module and Type Management
50
51
### DatasetModuleClient
52
53
```java { .api }
54
public class DatasetModuleClient {
55
// Constructors
56
public DatasetModuleClient(ClientConfig config);
57
public DatasetModuleClient(ClientConfig config, RESTClient restClient);
58
59
// Dataset module management
60
public void deploy(NamespaceId namespace, String moduleName, String className, File jarFile);
61
public void deploy(DatasetModuleId moduleId, String className, File jarFile);
62
public List<DatasetModuleMeta> list(NamespaceId namespace);
63
public DatasetModuleMeta get(DatasetModuleId moduleId);
64
public void delete(DatasetModuleId moduleId);
65
public void deleteAll(NamespaceId namespace);
66
}
67
```
68
69
### DatasetTypeClient
70
71
```java { .api }
72
public class DatasetTypeClient {
73
// Constructors
74
public DatasetTypeClient(ClientConfig config);
75
public DatasetTypeClient(ClientConfig config, RESTClient restClient);
76
77
// Dataset type operations
78
public List<DatasetTypeMeta> list(NamespaceId namespace);
79
public DatasetTypeMeta get(DatasetTypeId typeId);
80
}
81
```
82
83
## Workflow Management
84
85
### WorkflowClient
86
87
```java { .api }
88
public class WorkflowClient {
89
// Constructors
90
public WorkflowClient(ClientConfig config);
91
public WorkflowClient(ClientConfig config, RESTClient restClient);
92
93
// Workflow token operations
94
public WorkflowToken getWorkflowToken(ProgramRunId workflowRun);
95
public WorkflowToken getWorkflowToken(ProgramRunId workflowRun, WorkflowToken.Scope scope);
96
public WorkflowToken getWorkflowToken(ProgramRunId workflowRun, WorkflowToken.Scope scope, String key);
97
98
// Workflow state operations
99
public List<WorkflowNodeStateDetail> getNodeStates(ProgramRunId workflowRun);
100
public WorkflowNodeStateDetail getNodeState(ProgramRunId workflowRun, String nodeId);
101
102
// Workflow statistics
103
public WorkflowStatistics getStatistics(ProgramId workflow, long startTime, long endTime);
104
}
105
```
106
107
## System Information
108
109
### MetaClient
110
111
```java { .api }
112
public class MetaClient {
113
// Constructors
114
public MetaClient(ClientConfig config);
115
public MetaClient(ClientConfig config, RESTClient restClient);
116
117
// System information
118
public String getVersion();
119
public void ping();
120
public Map<String, String> getConfiguration();
121
public CDAPCapabilities getCapabilities();
122
}
123
```
124
125
## Data Lineage Operations
126
127
### Basic Lineage Tracking
128
129
```java
130
// Get lineage for a dataset
131
DatasetId userProfiles = DatasetId.of(namespace, "user-profiles");
132
long endTime = System.currentTimeMillis();
133
long startTime = endTime - TimeUnit.DAYS.toMillis(7); // Last 7 days
134
int levels = 3; // 3 levels of lineage
135
136
Lineage lineage = lineageClient.getLineage(userProfiles, startTime, endTime, levels);
137
138
System.out.println("Lineage for dataset: " + userProfiles.getDataset());
139
System.out.println("Programs that read from this dataset:");
140
for (ProgramId consumer : lineage.getConsumers()) {
141
System.out.println("- " + consumer.getApplication() + "." + consumer.getProgram() +
142
" (" + consumer.getType() + ")");
143
}
144
145
System.out.println("Programs that write to this dataset:");
146
for (ProgramId producer : lineage.getProducers()) {
147
System.out.println("- " + producer.getApplication() + "." + producer.getProgram() +
148
" (" + producer.getType() + ")");
149
}
150
151
// Get upstream and downstream datasets
152
Set<DatasetId> upstreamDatasets = lineage.getUpstreamDatasets();
153
Set<DatasetId> downstreamDatasets = lineage.getDownstreamDatasets();
154
155
System.out.println("Upstream datasets: " + upstreamDatasets);
156
System.out.println("Downstream datasets: " + downstreamDatasets);
157
```
158
159
### Field-Level Lineage
160
161
```java
162
// Get field lineage for specific field
163
String fieldName = "user_id";
164
FieldLineage fieldLineage = lineageClient.getFieldLineage(userProfiles, fieldName, startTime, endTime);
165
166
System.out.println("Field lineage for " + fieldName + ":");
167
System.out.println("Source fields: " + fieldLineage.getSourceFields());
168
System.out.println("Destination fields: " + fieldLineage.getDestinationFields());
169
170
// Analyze field transformations
171
for (FieldTransformation transformation : fieldLineage.getTransformations()) {
172
System.out.println("Transformation in " + transformation.getProgram() + ":");
173
System.out.println(" From: " + transformation.getSourceField());
174
System.out.println(" To: " + transformation.getDestinationField());
175
System.out.println(" Operation: " + transformation.getOperation());
176
}
177
```
178
179
### Dependency Analysis
180
181
```java
182
// Get dataset dependencies
183
Set<DatasetId> dependencies = lineageClient.getDatasetDependencies(userProfiles, startTime, endTime);
184
System.out.println("Datasets that " + userProfiles.getDataset() + " depends on: " + dependencies);
185
186
// Get consumers and producers
187
Set<ProgramId> consumers = lineageClient.getDatasetConsumers(userProfiles, startTime, endTime);
188
Set<ProgramId> producers = lineageClient.getDatasetProducers(userProfiles, startTime, endTime);
189
190
System.out.println("Programs consuming from " + userProfiles.getDataset() + ":");
191
consumers.forEach(program ->
192
System.out.println("- " + program.getApplication() + "." + program.getProgram()));
193
194
System.out.println("Programs producing to " + userProfiles.getDataset() + ":");
195
producers.forEach(program ->
196
System.out.println("- " + program.getApplication() + "." + program.getProgram()));
197
```
198
199
## Preferences Management
200
201
### Namespace-Level Preferences
202
203
```java
204
// Set namespace preferences
205
Map<String, String> namespacePrefs = Map.of(
206
"default.batch.size", "1000",
207
"default.timeout.minutes", "30",
208
"log.level", "INFO",
209
"metrics.collection.enabled", "true",
210
"data.retention.default.days", "90"
211
);
212
213
preferencesClient.setPreferences(namespace, namespacePrefs);
214
System.out.println("Set namespace preferences");
215
216
// Get namespace preferences
217
Map<String, String> retrievedPrefs = preferencesClient.getPreferences(namespace);
218
System.out.println("Namespace preferences: " + retrievedPrefs);
219
220
// Update specific preference
221
Map<String, String> updatedPrefs = new HashMap<>(retrievedPrefs);
222
updatedPrefs.put("log.level", "DEBUG");
223
preferencesClient.setPreferences(namespace, updatedPrefs);
224
```
225
226
### Application-Level Preferences
227
228
```java
229
// Set application preferences
230
ApplicationId appId = ApplicationId.of(namespace, "data-pipeline", "1.0.0");
231
Map<String, String> appPrefs = Map.of(
232
"input.path", "/data/pipeline/input",
233
"output.path", "/data/pipeline/output",
234
"parallel.workers", "4",
235
"checkpoint.interval", "60000",
236
"error.handling", "skip"
237
);
238
239
preferencesClient.setApplicationPreferences(appId, appPrefs);
240
System.out.println("Set application preferences for: " + appId.getApplication());
241
242
// Get application preferences
243
Map<String, String> appRetrievedPrefs = preferencesClient.getApplicationPreferences(appId);
244
System.out.println("Application preferences: " + appRetrievedPrefs);
245
```
246
247
### Program-Level Preferences
248
249
```java
250
// Set program-specific preferences
251
ProgramId workflowId = ProgramId.of(appId, ProgramType.WORKFLOW, "etl-workflow");
252
Map<String, String> programPrefs = Map.of(
253
"workflow.timeout", "7200000", // 2 hours
254
"max.retries", "3",
255
"retry.delay.seconds", "30",
256
"notification.email", "team@company.com"
257
);
258
259
preferencesClient.setProgramPreferences(workflowId, programPrefs);
260
261
// Get resolved preferences (with inheritance from namespace and application)
262
Map<String, String> resolvedPrefs = preferencesClient.getResolvedPreferences(workflowId);
263
System.out.println("Resolved preferences for " + workflowId.getProgram() + ": " + resolvedPrefs);
264
265
// Analyze preference inheritance
266
System.out.println("Preference resolution order (highest to lowest priority):");
267
System.out.println("1. Program level: " + preferencesClient.getProgramPreferences(workflowId).keySet());
268
System.out.println("2. Application level: " + preferencesClient.getApplicationPreferences(appId).keySet());
269
System.out.println("3. Namespace level: " + preferencesClient.getPreferences(namespace).keySet());
270
```
271
272
## Dataset Module Management
273
274
### Module Deployment
275
276
```java
277
// Deploy custom dataset module
278
File moduleJar = new File("/path/to/custom-dataset-module.jar");
279
String moduleName = "custom-table-module";
280
String mainClassName = "com.company.dataset.CustomTableModule";
281
282
datasetModuleClient.deploy(namespace, moduleName, mainClassName, moduleJar);
283
System.out.println("Deployed dataset module: " + moduleName);
284
285
// Deploy with module ID
286
DatasetModuleId moduleId = DatasetModuleId.of(namespace, moduleName);
287
datasetModuleClient.deploy(moduleId, mainClassName, moduleJar);
288
289
// List deployed modules
290
List<DatasetModuleMeta> modules = datasetModuleClient.list(namespace);
291
System.out.println("Deployed dataset modules:");
292
for (DatasetModuleMeta module : modules) {
293
System.out.println("- " + module.getName());
294
System.out.println(" Class: " + module.getClassName());
295
System.out.println(" JAR: " + module.getJarLocationPath());
296
System.out.println(" Types: " + module.getTypes());
297
}
298
```
299
300
### Module Information and Management
301
302
```java
303
// Get specific module information
304
DatasetModuleMeta moduleInfo = datasetModuleClient.get(moduleId);
305
System.out.println("Module: " + moduleInfo.getName());
306
System.out.println("Class: " + moduleInfo.getClassName());
307
System.out.println("JAR location: " + moduleInfo.getJarLocationPath());
308
System.out.println("Provided types: " + moduleInfo.getTypes());
309
310
// Delete module
311
try {
312
datasetModuleClient.delete(moduleId);
313
System.out.println("Deleted module: " + moduleName);
314
} catch (DatasetModuleInUseException e) {
315
System.err.println("Cannot delete module - datasets depend on it: " + e.getMessage());
316
}
317
318
// Clean up all modules in namespace (use with caution!)
319
// datasetModuleClient.deleteAll(namespace);
320
```
321
322
### Dataset Type Operations
323
324
```java
325
// List available dataset types
326
List<DatasetTypeMeta> types = datasetTypeClient.list(namespace);
327
System.out.println("Available dataset types:");
328
for (DatasetTypeMeta type : types) {
329
System.out.println("- " + type.getName());
330
System.out.println(" Modules: " + type.getModules());
331
}
332
333
// Get specific type information
334
DatasetTypeId typeId = DatasetTypeId.of(namespace, "custom-table");
335
try {
336
DatasetTypeMeta typeInfo = datasetTypeClient.get(typeId);
337
System.out.println("Type: " + typeInfo.getName());
338
System.out.println("Modules: " + typeInfo.getModules());
339
} catch (DatasetTypeNotFoundException e) {
340
System.err.println("Dataset type not found: " + typeId.getType());
341
}
342
```
343
344
## Workflow Operations
345
346
### Workflow Token Management
347
348
```java
349
// Get workflow token for a specific run
350
ApplicationId appId = ApplicationId.of(namespace, "analytics-app", "1.0.0");
351
ProgramId workflowId = ProgramId.of(appId, ProgramType.WORKFLOW, "data-processing");
352
String runId = "run-12345"; // Obtained from program runs
353
ProgramRunId workflowRunId = ProgramRunId.of(workflowId, runId);
354
355
WorkflowToken token = workflowClient.getWorkflowToken(workflowRunId);
356
System.out.println("Workflow token for run: " + runId);
357
358
// Get all token data
359
Map<String, WorkflowToken.Value> allTokens = token.getAll();
360
for (Map.Entry<String, WorkflowToken.Value> entry : allTokens.entrySet()) {
361
WorkflowToken.Value value = entry.getValue();
362
System.out.println("Key: " + entry.getKey());
363
System.out.println(" Value: " + value.toString());
364
System.out.println(" Node: " + value.getNodeName());
365
}
366
367
// Get tokens by scope
368
WorkflowToken systemTokens = workflowClient.getWorkflowToken(workflowRunId, WorkflowToken.Scope.SYSTEM);
369
WorkflowToken userTokens = workflowClient.getWorkflowToken(workflowRunId, WorkflowToken.Scope.USER);
370
371
System.out.println("System tokens: " + systemTokens.getAll().keySet());
372
System.out.println("User tokens: " + userTokens.getAll().keySet());
373
374
// Get specific token value
375
String specificKey = "processed.records.count";
376
WorkflowToken specificToken = workflowClient.getWorkflowToken(workflowRunId, WorkflowToken.Scope.USER, specificKey);
377
if (specificToken.getAll().containsKey(specificKey)) {
378
WorkflowToken.Value recordCount = specificToken.get(specificKey);
379
System.out.println("Processed records: " + recordCount.toString());
380
}
381
```
382
383
### Workflow State Tracking
384
385
```java
386
// Get workflow node states
387
List<WorkflowNodeStateDetail> nodeStates = workflowClient.getNodeStates(workflowRunId);
388
System.out.println("Workflow nodes (" + nodeStates.size() + "):");
389
390
for (WorkflowNodeStateDetail nodeState : nodeStates) {
391
System.out.println("- Node: " + nodeState.getNodeId());
392
System.out.println(" Status: " + nodeState.getNodeStatus());
393
System.out.println(" Type: " + nodeState.getNodeType());
394
System.out.println(" Start: " + new Date(nodeState.getStartTime()));
395
396
if (nodeState.getEndTime() != null) {
397
System.out.println(" End: " + new Date(nodeState.getEndTime()));
398
long duration = nodeState.getEndTime() - nodeState.getStartTime();
399
System.out.println(" Duration: " + duration + " ms");
400
}
401
402
if (nodeState.getFailureCause() != null) {
403
System.out.println(" Failure: " + nodeState.getFailureCause());
404
}
405
}
406
407
// Get specific node state
408
String nodeId = "data-validation-action";
409
WorkflowNodeStateDetail specificNode = workflowClient.getNodeState(workflowRunId, nodeId);
410
System.out.println("Node " + nodeId + " status: " + specificNode.getNodeStatus());
411
```
412
413
### Workflow Statistics
414
415
```java
416
// Get workflow statistics over time period
417
long endTime = System.currentTimeMillis();
418
long startTime = endTime - TimeUnit.DAYS.toMillis(30); // Last 30 days
419
420
WorkflowStatistics stats = workflowClient.getStatistics(workflowId, startTime, endTime);
421
System.out.println("Workflow statistics for " + workflowId.getProgram() + ":");
422
System.out.println("Total runs: " + stats.getTotalRuns());
423
System.out.println("Successful runs: " + stats.getSuccessfulRuns());
424
System.out.println("Failed runs: " + stats.getFailedRuns());
425
System.out.println("Average duration: " + stats.getAverageDuration() + " ms");
426
427
// Node-level statistics
428
Map<String, WorkflowNodeStatistics> nodeStats = stats.getNodeStatistics();
429
for (Map.Entry<String, WorkflowNodeStatistics> entry : nodeStats.entrySet()) {
430
WorkflowNodeStatistics nodeStat = entry.getValue();
431
System.out.println("Node " + entry.getKey() + ":");
432
System.out.println(" Success rate: " + (nodeStat.getSuccessfulRuns() * 100.0 / nodeStat.getTotalRuns()) + "%");
433
System.out.println(" Average duration: " + nodeStat.getAverageDuration() + " ms");
434
}
435
```
436
437
## System Information and Health
438
439
### System Metadata
440
441
```java
442
// Get CDAP version
443
String version = metaClient.getVersion();
444
System.out.println("CDAP Version: " + version);
445
446
// Ping CDAP instance
447
try {
448
metaClient.ping();
449
System.out.println("CDAP instance is responsive");
450
} catch (IOException e) {
451
System.err.println("CDAP instance is not responsive: " + e.getMessage());
452
}
453
454
// Get system configuration
455
Map<String, String> config = metaClient.getConfiguration();
456
System.out.println("System configuration:");
457
for (Map.Entry<String, String> entry : config.entrySet()) {
458
if (entry.getKey().contains("password") || entry.getKey().contains("secret")) {
459
System.out.println(" " + entry.getKey() + ": [REDACTED]");
460
} else {
461
System.out.println(" " + entry.getKey() + ": " + entry.getValue());
462
}
463
}
464
465
// Get system capabilities
466
CDAPCapabilities capabilities = metaClient.getCapabilities();
467
System.out.println("System capabilities:");
468
System.out.println(" Security enabled: " + capabilities.isSecurityEnabled());
469
System.out.println(" Namespaces supported: " + capabilities.isNamespacesSupported());
470
System.out.println(" Audit logging: " + capabilities.isAuditLoggingEnabled());
471
```
472
473
## Advanced Data Operations
474
475
### Comprehensive Data Pipeline Analysis
476
477
```java
478
public class DataPipelineAnalyzer {
479
private final LineageClient lineageClient;
480
private final PreferencesClient preferencesClient;
481
private final WorkflowClient workflowClient;
482
483
public DataPipelineAnalyzer(LineageClient lineageClient,
484
PreferencesClient preferencesClient,
485
WorkflowClient workflowClient) {
486
this.lineageClient = lineageClient;
487
this.preferencesClient = preferencesClient;
488
this.workflowClient = workflowClient;
489
}
490
491
public PipelineAnalysisReport analyzePipeline(DatasetId dataset, long analysisWindowDays) {
492
long endTime = System.currentTimeMillis();
493
long startTime = endTime - TimeUnit.DAYS.toMillis(analysisWindowDays);
494
495
PipelineAnalysisReport.Builder reportBuilder = PipelineAnalysisReport.builder()
496
.dataset(dataset)
497
.analysisWindow(startTime, endTime);
498
499
try {
500
// Get lineage information
501
Lineage lineage = lineageClient.getLineage(dataset, startTime, endTime, 5);
502
reportBuilder.lineage(lineage);
503
504
// Analyze producers and consumers
505
Set<ProgramId> producers = lineage.getProducers();
506
Set<ProgramId> consumers = lineage.getConsumers();
507
508
reportBuilder.producers(producers).consumers(consumers);
509
510
// Get workflow statistics for each producer/consumer
511
for (ProgramId program : producers) {
512
if (program.getType() == ProgramType.WORKFLOW) {
513
WorkflowStatistics stats = workflowClient.getStatistics(program, startTime, endTime);
514
reportBuilder.addWorkflowStats(program, stats);
515
}
516
}
517
518
for (ProgramId program : consumers) {
519
if (program.getType() == ProgramType.WORKFLOW) {
520
WorkflowStatistics stats = workflowClient.getStatistics(program, startTime, endTime);
521
reportBuilder.addWorkflowStats(program, stats);
522
}
523
}
524
525
// Get preferences for involved applications
526
Set<ApplicationId> applications = new HashSet<>();
527
producers.forEach(p -> applications.add(p.getApplication()));
528
consumers.forEach(p -> applications.add(p.getApplication()));
529
530
for (ApplicationId app : applications) {
531
Map<String, String> prefs = preferencesClient.getApplicationPreferences(app);
532
reportBuilder.addApplicationPreferences(app, prefs);
533
}
534
535
} catch (Exception e) {
536
reportBuilder.error("Analysis failed: " + e.getMessage());
537
}
538
539
return reportBuilder.build();
540
}
541
}
542
543
// Analysis report data structure
544
public class PipelineAnalysisReport {
545
private final DatasetId dataset;
546
private final long startTime;
547
private final long endTime;
548
private final Lineage lineage;
549
private final Set<ProgramId> producers;
550
private final Set<ProgramId> consumers;
551
private final Map<ProgramId, WorkflowStatistics> workflowStats;
552
private final Map<ApplicationId, Map<String, String>> applicationPreferences;
553
private final String error;
554
555
// Constructor, getters, and builder
556
}
557
```
558
559
## Error Handling
560
561
Data operations may throw these common exceptions:
562
563
- **LineageNotFoundException**: Lineage data not available for specified time range
564
- **PreferencesNotFoundException**: Preferences not set at requested level
565
- **DatasetModuleNotFoundException**: Dataset module does not exist
566
- **WorkflowNotFoundException**: Workflow or run not found
567
- **UnauthenticatedException**: Authentication required
568
- **UnauthorizedException**: Insufficient permissions
569
570
```java
571
try {
572
Lineage lineage = lineageClient.getLineage(datasetId, startTime, endTime, 3);
573
System.out.println("Lineage retrieved successfully");
574
} catch (LineageNotFoundException e) {
575
System.err.println("No lineage data available for time range");
576
} catch (UnauthorizedException e) {
577
System.err.println("No permission to access lineage data: " + e.getMessage());
578
} catch (IOException e) {
579
System.err.println("Network error: " + e.getMessage());
580
}
581
```
582
583
## Best Practices
584
585
1. **Lineage Tracking**: Regularly analyze data lineage for impact assessment
586
2. **Preference Hierarchy**: Use preference inheritance effectively (namespace → application → program)
587
3. **Module Management**: Keep dataset modules updated and remove unused ones
588
4. **Workflow Monitoring**: Monitor workflow tokens and node states for debugging
589
5. **System Health**: Regularly check system capabilities and configuration
590
6. **Performance Analysis**: Use workflow statistics for performance optimization
591
592
```java
593
// Good: Comprehensive data operations with proper error handling
594
public class DataOperationsManager {
595
private final LineageClient lineageClient;
596
private final PreferencesClient preferencesClient;
597
private final WorkflowClient workflowClient;
598
599
public DataOperationsManager(LineageClient lineageClient,
600
PreferencesClient preferencesClient,
601
WorkflowClient workflowClient) {
602
this.lineageClient = lineageClient;
603
this.preferencesClient = preferencesClient;
604
this.workflowClient = workflowClient;
605
}
606
607
public void setupApplicationDefaults(ApplicationId appId, Map<String, String> defaultPreferences) {
608
try {
609
// Set application preferences
610
preferencesClient.setApplicationPreferences(appId, defaultPreferences);
611
612
// Verify preferences were set
613
Map<String, String> verifyPrefs = preferencesClient.getApplicationPreferences(appId);
614
if (!verifyPrefs.equals(defaultPreferences)) {
615
System.err.println("Preference verification failed for " + appId.getApplication());
616
} else {
617
System.out.println("Successfully configured preferences for " + appId.getApplication());
618
}
619
620
} catch (Exception e) {
621
System.err.println("Failed to set preferences for " + appId.getApplication() + ": " + e.getMessage());
622
throw new RuntimeException("Preference setup failed", e);
623
}
624
}
625
626
public void analyzeDatasetImpact(DatasetId datasetId) {
627
try {
628
long endTime = System.currentTimeMillis();
629
long startTime = endTime - TimeUnit.DAYS.toMillis(30);
630
631
// Get dataset dependencies
632
Set<ProgramId> consumers = lineageClient.getDatasetConsumers(datasetId, startTime, endTime);
633
Set<ProgramId> producers = lineageClient.getDatasetProducers(datasetId, startTime, endTime);
634
635
System.out.println("Impact analysis for dataset: " + datasetId.getDataset());
636
System.out.println("Affected consumers: " + consumers.size());
637
System.out.println("Data producers: " + producers.size());
638
639
// Warn if dataset has many dependencies
640
if (consumers.size() > 10) {
641
System.out.println("WARNING: Dataset has many consumers - changes may have wide impact");
642
}
643
644
} catch (Exception e) {
645
System.err.println("Impact analysis failed for " + datasetId.getDataset() + ": " + e.getMessage());
646
}
647
}
648
}
649
```