0
# Usage Registry
1
2
Program-dataset relationship tracking for governance, lineage analysis, and impact assessment with comprehensive query capabilities. The Usage Registry provides essential functionality for understanding dependencies between programs and datasets, enabling effective governance and impact analysis across the CDAP platform.
3
4
## Capabilities
5
6
### Core Usage Registry Operations
7
8
The primary interface for tracking and querying program-dataset relationships with comprehensive bidirectional queries.
9
10
```java { .api }
11
public interface UsageRegistry extends UsageWriter {
12
// Application lifecycle management
13
void unregister(ApplicationId applicationId);
14
15
// Dataset relationship queries
16
Set<DatasetId> getDatasets(ApplicationId id);
17
Set<DatasetId> getDatasets(ProgramId id);
18
19
// Stream relationship queries
20
Set<StreamId> getStreams(ApplicationId id);
21
Set<StreamId> getStreams(ProgramId id);
22
23
// Program relationship queries (reverse lookups)
24
Set<ProgramId> getPrograms(DatasetId id);
25
Set<ProgramId> getPrograms(StreamId id);
26
}
27
28
public interface UsageWriter {
29
// Usage registration methods for tracking relationships
30
void register(ProgramId programId, DatasetId datasetId);
31
void register(ProgramId programId, StreamId streamId);
32
void registerAll(Iterable<? extends ProgramId> programIds, DatasetId datasetId);
33
void registerAll(Iterable<? extends ProgramId> programIds, StreamId streamId);
34
}
35
```
36
37
### Usage Registry Implementations
38
39
Different implementations of the Usage Registry for various deployment scenarios and performance requirements.
40
41
```java { .api }
42
// Standard usage registry implementation
43
public class BasicUsageRegistry implements UsageRegistry {
44
// Full-featured usage tracking with persistent storage
45
}
46
47
// No-operation implementation for testing
48
public class NoOpUsageRegistry implements UsageRegistry {
49
// Null object pattern implementation for testing scenarios
50
}
51
52
// Messaging-based usage writer for distributed systems
53
public class MessagingUsageWriter implements UsageWriter {
54
// Asynchronous usage tracking via messaging system
55
}
56
```
57
58
### Usage Data Models
59
60
Internal data structures for representing and managing usage relationships.
61
62
```java { .api }
63
// Dataset usage information structure
64
public class DatasetUsage {
65
public DatasetId getDatasetId();
66
public Set<ProgramId> getPrograms();
67
public long getTimestamp();
68
}
69
70
// Usage record key for efficient storage and retrieval
71
public class DatasetUsageKey {
72
public DatasetId getDatasetId();
73
public ProgramId getProgramId();
74
public String getKey();
75
}
76
```
77
78
## Usage Examples
79
80
### Basic Usage Registration and Queries
81
82
```java
83
// Access usage registry (typically injected)
84
UsageRegistry usageRegistry = // ... obtain instance
85
86
// Define application and program identifiers
87
ApplicationId appId = NamespaceId.DEFAULT.app("dataProcessingApp");
88
ProgramId mapReduceProgram = appId.mr("dataProcessor");
89
ProgramId workflowProgram = appId.workflow("dataWorkflow");
90
91
// Define dataset and stream identifiers
92
DatasetId inputDataset = NamespaceId.DEFAULT.dataset("rawData");
93
DatasetId outputDataset = NamespaceId.DEFAULT.dataset("processedData");
94
StreamId inputStream = NamespaceId.DEFAULT.stream("events");
95
96
// Register program-dataset relationships
97
usageRegistry.register(mapReduceProgram, inputDataset);
98
usageRegistry.register(mapReduceProgram, outputDataset);
99
usageRegistry.register(workflowProgram, inputDataset);
100
101
// Register program-stream relationships
102
usageRegistry.register(mapReduceProgram, inputStream);
103
104
// Query datasets used by application
105
Set<DatasetId> appDatasets = usageRegistry.getDatasets(appId);
106
System.out.println("Application datasets: " + appDatasets);
107
108
// Query datasets used by specific program
109
Set<DatasetId> programDatasets = usageRegistry.getDatasets(mapReduceProgram);
110
System.out.println("MapReduce program datasets: " + programDatasets);
111
112
// Query streams used by program
113
Set<StreamId> programStreams = usageRegistry.getStreams(mapReduceProgram);
114
System.out.println("MapReduce program streams: " + programStreams);
115
```
116
117
### Impact Analysis and Dependency Discovery
118
119
```java
120
// Find all programs that use a specific dataset (impact analysis)
121
DatasetId criticalDataset = NamespaceId.DEFAULT.dataset("customerData");
122
Set<ProgramId> affectedPrograms = usageRegistry.getPrograms(criticalDataset);
123
124
System.out.println("Programs affected by dataset changes:");
125
for (ProgramId program : affectedPrograms) {
126
System.out.println(" - " + program.getApplication() + "." + program.getProgram());
127
}
128
129
// Find all programs consuming from a stream
130
StreamId eventStream = NamespaceId.DEFAULT.stream("userEvents");
131
Set<ProgramId> streamConsumers = usageRegistry.getPrograms(eventStream);
132
133
System.out.println("Programs consuming from stream:");
134
for (ProgramId consumer : streamConsumers) {
135
System.out.println(" - " + consumer.getApplication() + "." + consumer.getProgram());
136
}
137
138
// Comprehensive dependency analysis
139
Map<String, Set<String>> dependencies = new HashMap<>();
140
for (ProgramId program : affectedPrograms) {
141
Set<DatasetId> datasets = usageRegistry.getDatasets(program);
142
Set<StreamId> streams = usageRegistry.getStreams(program);
143
144
Set<String> resources = new HashSet<>();
145
datasets.forEach(ds -> resources.add("dataset:" + ds.getDataset()));
146
streams.forEach(st -> resources.add("stream:" + st.getStream()));
147
148
dependencies.put(program.toString(), resources);
149
}
150
151
System.out.println("Full dependency map: " + dependencies);
152
```
153
154
### Batch Registration Operations
155
156
```java
157
// Register multiple programs with the same dataset
158
List<ProgramId> batchProcessingPrograms = Arrays.asList(
159
appId.mr("processor1"),
160
appId.mr("processor2"),
161
appId.mr("processor3"),
162
appId.workflow("batchWorkflow")
163
);
164
165
DatasetId sharedDataset = NamespaceId.DEFAULT.dataset("sharedLookupData");
166
167
// Efficient batch registration
168
usageRegistry.registerAll(batchProcessingPrograms, sharedDataset);
169
170
// Verify all programs are registered
171
for (ProgramId program : batchProcessingPrograms) {
172
Set<DatasetId> datasets = usageRegistry.getDatasets(program);
173
assert datasets.contains(sharedDataset);
174
}
175
176
// Batch registration for stream relationships
177
StreamId eventStream = NamespaceId.DEFAULT.stream("auditEvents");
178
usageRegistry.registerAll(batchProcessingPrograms, eventStream);
179
```
180
181
### Application Lifecycle Management
182
183
```java
184
// Application deployment - register all program relationships
185
ApplicationId newApp = NamespaceId.DEFAULT.app("analyticsApp");
186
ProgramId sparkProgram = newApp.spark("analyticsEngine");
187
ProgramId serviceProgram = newApp.service("analyticsService");
188
189
// Register program dependencies
190
usageRegistry.register(sparkProgram, NamespaceId.DEFAULT.dataset("rawAnalytics"));
191
usageRegistry.register(sparkProgram, NamespaceId.DEFAULT.dataset("processedAnalytics"));
192
usageRegistry.register(serviceProgram, NamespaceId.DEFAULT.dataset("processedAnalytics"));
193
194
// Verify application dependencies before deployment
195
Set<DatasetId> requiredDatasets = usageRegistry.getDatasets(newApp);
196
for (DatasetId dataset : requiredDatasets) {
197
// Check if datasets exist and are accessible
198
verifyDatasetExists(dataset);
199
}
200
201
// Application undeployment - clean up usage tracking
202
System.out.println("Unregistering application: " + newApp);
203
usageRegistry.unregister(newApp);
204
205
// Verify cleanup
206
Set<DatasetId> remainingDatasets = usageRegistry.getDatasets(newApp);
207
assert remainingDatasets.isEmpty() : "Application still has registered datasets";
208
```
209
210
### Governance and Compliance Reporting
211
212
```java
213
// Generate compliance report for data usage
214
public void generateUsageReport(NamespaceId namespace) {
215
Map<String, List<String>> datasetUsage = new HashMap<>();
216
Map<String, List<String>> streamUsage = new HashMap<>();
217
218
// Get all applications in namespace
219
List<ApplicationId> applications = getApplicationsInNamespace(namespace);
220
221
for (ApplicationId app : applications) {
222
Set<DatasetId> datasets = usageRegistry.getDatasets(app);
223
Set<StreamId> streams = usageRegistry.getStreams(app);
224
225
for (DatasetId dataset : datasets) {
226
datasetUsage.computeIfAbsent(dataset.getDataset(), k -> new ArrayList<>())
227
.add(app.getApplication());
228
}
229
230
for (StreamId stream : streams) {
231
streamUsage.computeIfAbsent(stream.getStream(), k -> new ArrayList<>())
232
.add(app.getApplication());
233
}
234
}
235
236
System.out.println("=== Dataset Usage Report ===");
237
datasetUsage.forEach((dataset, apps) -> {
238
System.out.println(dataset + " used by: " + String.join(", ", apps));
239
});
240
241
System.out.println("\n=== Stream Usage Report ===");
242
streamUsage.forEach((stream, apps) -> {
243
System.out.println(stream + " consumed by: " + String.join(", ", apps));
244
});
245
}
246
247
// Find unused datasets for cleanup
248
public Set<DatasetId> findUnusedDatasets(NamespaceId namespace) {
249
Set<DatasetId> allDatasets = getAllDatasetsInNamespace(namespace);
250
Set<DatasetId> usedDatasets = new HashSet<>();
251
252
List<ApplicationId> applications = getApplicationsInNamespace(namespace);
253
for (ApplicationId app : applications) {
254
usedDatasets.addAll(usageRegistry.getDatasets(app));
255
}
256
257
Set<DatasetId> unusedDatasets = new HashSet<>(allDatasets);
258
unusedDatasets.removeAll(usedDatasets);
259
260
return unusedDatasets;
261
}
262
```
263
264
### Usage Writer for Real-time Tracking
265
266
```java
267
// Real-time usage tracking during program execution
268
public class ProgramExecutionTracker {
269
private final UsageWriter usageWriter;
270
271
public ProgramExecutionTracker(UsageWriter usageWriter) {
272
this.usageWriter = usageWriter;
273
}
274
275
public void trackDatasetAccess(ProgramId program, DatasetId dataset) {
276
// Register usage relationship in real-time
277
usageWriter.register(program, dataset);
278
279
// Log for audit purposes
280
System.out.println("Registered dataset access: " + program + " -> " + dataset);
281
}
282
283
public void trackStreamAccess(ProgramId program, StreamId stream) {
284
// Register stream usage relationship
285
usageWriter.register(program, stream);
286
287
// Log for audit purposes
288
System.out.println("Registered stream access: " + program + " -> " + stream);
289
}
290
}
291
292
// Usage in program runtime
293
ProgramExecutionTracker tracker = new ProgramExecutionTracker(usageRegistry);
294
ProgramId currentProgram = // ... get current program context
295
296
// Track dataset access as it happens
297
DatasetId dataset = NamespaceId.DEFAULT.dataset("userProfiles");
298
tracker.trackDatasetAccess(currentProgram, dataset);
299
300
// Track stream access
301
StreamId stream = NamespaceId.DEFAULT.stream("events");
302
tracker.trackStreamAccess(currentProgram, stream);
303
```
304
305
## Types
306
307
```java { .api }
308
// Core entity identifiers
309
public final class ApplicationId extends EntityId {
310
public static ApplicationId of(String namespace, String application);
311
public String getApplication();
312
public NamespaceId getParent();
313
314
// Program ID factory methods
315
public ProgramId mr(String program);
316
public ProgramId spark(String program);
317
public ProgramId service(String program);
318
public ProgramId worker(String program);
319
public ProgramId workflow(String program);
320
}
321
322
public final class ProgramId extends EntityId {
323
public static ProgramId of(String namespace, String application, ProgramType type, String program);
324
public String getProgram();
325
public ProgramType getType();
326
public ApplicationId getParent();
327
}
328
329
public final class DatasetId extends EntityId {
330
public static DatasetId of(String namespace, String dataset);
331
public String getDataset();
332
public NamespaceId getParent();
333
}
334
335
public final class StreamId extends EntityId {
336
public static StreamId of(String namespace, String stream);
337
public String getStream();
338
public NamespaceId getParent();
339
}
340
341
// Program types
342
public enum ProgramType {
343
MAPREDUCE("MapReduce"),
344
WORKFLOW("Workflow"),
345
SERVICE("Service"),
346
SPARK("Spark"),
347
WORKER("Worker");
348
349
private final String prettyName;
350
351
ProgramType(String prettyName) {
352
this.prettyName = prettyName;
353
}
354
355
public String getPrettyName() {
356
return prettyName;
357
}
358
}
359
360
// Usage data structures
361
public final class DatasetUsage {
362
public DatasetId getDatasetId();
363
public Set<ProgramId> getPrograms();
364
public long getCreationTime();
365
public long getLastAccessTime();
366
}
367
368
public final class DatasetUsageKey {
369
public DatasetId getDatasetId();
370
public ProgramId getProgramId();
371
public long getTimestamp();
372
373
public String getKey();
374
public static DatasetUsageKey of(DatasetId datasetId, ProgramId programId);
375
}
376
377
// Exception types
378
public class UsageException extends Exception {
379
public UsageException(String message);
380
public UsageException(String message, Throwable cause);
381
}
382
```