or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

audit-compliance.mddataset-management.mdindex.mdmetadata-management.mdnamespace-management.mdstream-processing.mdtransaction-management.mdusage-registry.md

usage-registry.mddocs/

0

# Usage Registry

1

2

Program-dataset relationship tracking for governance, lineage analysis, and impact assessment with comprehensive query capabilities. The Usage Registry provides essential functionality for understanding dependencies between programs and datasets, enabling effective governance and impact analysis across the CDAP platform.

3

4

## Capabilities

5

6

### Core Usage Registry Operations

7

8

The primary interface for tracking and querying program-dataset relationships with comprehensive bidirectional queries.

9

10

```java { .api }

11

public interface UsageRegistry extends UsageWriter {

12

// Application lifecycle management

13

void unregister(ApplicationId applicationId);

14

15

// Dataset relationship queries

16

Set<DatasetId> getDatasets(ApplicationId id);

17

Set<DatasetId> getDatasets(ProgramId id);

18

19

// Stream relationship queries

20

Set<StreamId> getStreams(ApplicationId id);

21

Set<StreamId> getStreams(ProgramId id);

22

23

// Program relationship queries (reverse lookups)

24

Set<ProgramId> getPrograms(DatasetId id);

25

Set<ProgramId> getPrograms(StreamId id);

26

}

27

28

public interface UsageWriter {

29

// Usage registration methods for tracking relationships

30

void register(ProgramId programId, DatasetId datasetId);

31

void register(ProgramId programId, StreamId streamId);

32

void registerAll(Iterable<? extends ProgramId> programIds, DatasetId datasetId);

33

void registerAll(Iterable<? extends ProgramId> programIds, StreamId streamId);

34

}

35

```

36

37

### Usage Registry Implementations

38

39

Different implementations of the Usage Registry for various deployment scenarios and performance requirements.

40

41

```java { .api }

42

// Standard usage registry implementation

43

public class BasicUsageRegistry implements UsageRegistry {

44

// Full-featured usage tracking with persistent storage

45

}

46

47

// No-operation implementation for testing

48

public class NoOpUsageRegistry implements UsageRegistry {

49

// Null object pattern implementation for testing scenarios

50

}

51

52

// Messaging-based usage writer for distributed systems

53

public class MessagingUsageWriter implements UsageWriter {

54

// Asynchronous usage tracking via messaging system

55

}

56

```

57

58

### Usage Data Models

59

60

Internal data structures for representing and managing usage relationships.

61

62

```java { .api }

63

// Dataset usage information structure

64

public class DatasetUsage {

65

public DatasetId getDatasetId();

66

public Set<ProgramId> getPrograms();

67

public long getTimestamp();

68

}

69

70

// Usage record key for efficient storage and retrieval

71

public class DatasetUsageKey {

72

public DatasetId getDatasetId();

73

public ProgramId getProgramId();

74

public String getKey();

75

}

76

```

77

78

## Usage Examples

79

80

### Basic Usage Registration and Queries

81

82

```java

83

// Access usage registry (typically injected)

84

UsageRegistry usageRegistry = // ... obtain instance

85

86

// Define application and program identifiers

87

ApplicationId appId = NamespaceId.DEFAULT.app("dataProcessingApp");

88

ProgramId mapReduceProgram = appId.mr("dataProcessor");

89

ProgramId workflowProgram = appId.workflow("dataWorkflow");

90

91

// Define dataset and stream identifiers

92

DatasetId inputDataset = NamespaceId.DEFAULT.dataset("rawData");

93

DatasetId outputDataset = NamespaceId.DEFAULT.dataset("processedData");

94

StreamId inputStream = NamespaceId.DEFAULT.stream("events");

95

96

// Register program-dataset relationships

97

usageRegistry.register(mapReduceProgram, inputDataset);

98

usageRegistry.register(mapReduceProgram, outputDataset);

99

usageRegistry.register(workflowProgram, inputDataset);

100

101

// Register program-stream relationships

102

usageRegistry.register(mapReduceProgram, inputStream);

103

104

// Query datasets used by application

105

Set<DatasetId> appDatasets = usageRegistry.getDatasets(appId);

106

System.out.println("Application datasets: " + appDatasets);

107

108

// Query datasets used by specific program

109

Set<DatasetId> programDatasets = usageRegistry.getDatasets(mapReduceProgram);

110

System.out.println("MapReduce program datasets: " + programDatasets);

111

112

// Query streams used by program

113

Set<StreamId> programStreams = usageRegistry.getStreams(mapReduceProgram);

114

System.out.println("MapReduce program streams: " + programStreams);

115

```

116

117

### Impact Analysis and Dependency Discovery

118

119

```java

120

// Find all programs that use a specific dataset (impact analysis)

121

DatasetId criticalDataset = NamespaceId.DEFAULT.dataset("customerData");

122

Set<ProgramId> affectedPrograms = usageRegistry.getPrograms(criticalDataset);

123

124

System.out.println("Programs affected by dataset changes:");

125

for (ProgramId program : affectedPrograms) {

126

System.out.println(" - " + program.getApplication() + "." + program.getProgram());

127

}

128

129

// Find all programs consuming from a stream

130

StreamId eventStream = NamespaceId.DEFAULT.stream("userEvents");

131

Set<ProgramId> streamConsumers = usageRegistry.getPrograms(eventStream);

132

133

System.out.println("Programs consuming from stream:");

134

for (ProgramId consumer : streamConsumers) {

135

System.out.println(" - " + consumer.getApplication() + "." + consumer.getProgram());

136

}

137

138

// Comprehensive dependency analysis

139

Map<String, Set<String>> dependencies = new HashMap<>();

140

for (ProgramId program : affectedPrograms) {

141

Set<DatasetId> datasets = usageRegistry.getDatasets(program);

142

Set<StreamId> streams = usageRegistry.getStreams(program);

143

144

Set<String> resources = new HashSet<>();

145

datasets.forEach(ds -> resources.add("dataset:" + ds.getDataset()));

146

streams.forEach(st -> resources.add("stream:" + st.getStream()));

147

148

dependencies.put(program.toString(), resources);

149

}

150

151

System.out.println("Full dependency map: " + dependencies);

152

```

153

154

### Batch Registration Operations

155

156

```java

157

// Register multiple programs with the same dataset

158

List<ProgramId> batchProcessingPrograms = Arrays.asList(

159

appId.mr("processor1"),

160

appId.mr("processor2"),

161

appId.mr("processor3"),

162

appId.workflow("batchWorkflow")

163

);

164

165

DatasetId sharedDataset = NamespaceId.DEFAULT.dataset("sharedLookupData");

166

167

// Efficient batch registration

168

usageRegistry.registerAll(batchProcessingPrograms, sharedDataset);

169

170

// Verify all programs are registered

171

for (ProgramId program : batchProcessingPrograms) {

172

Set<DatasetId> datasets = usageRegistry.getDatasets(program);

173

assert datasets.contains(sharedDataset);

174

}

175

176

// Batch registration for stream relationships

177

StreamId eventStream = NamespaceId.DEFAULT.stream("auditEvents");

178

usageRegistry.registerAll(batchProcessingPrograms, eventStream);

179

```

180

181

### Application Lifecycle Management

182

183

```java

184

// Application deployment - register all program relationships

185

ApplicationId newApp = NamespaceId.DEFAULT.app("analyticsApp");

186

ProgramId sparkProgram = newApp.spark("analyticsEngine");

187

ProgramId serviceProgram = newApp.service("analyticsService");

188

189

// Register program dependencies

190

usageRegistry.register(sparkProgram, NamespaceId.DEFAULT.dataset("rawAnalytics"));

191

usageRegistry.register(sparkProgram, NamespaceId.DEFAULT.dataset("processedAnalytics"));

192

usageRegistry.register(serviceProgram, NamespaceId.DEFAULT.dataset("processedAnalytics"));

193

194

// Verify application dependencies before deployment

195

Set<DatasetId> requiredDatasets = usageRegistry.getDatasets(newApp);

196

for (DatasetId dataset : requiredDatasets) {

197

// Check if datasets exist and are accessible

198

verifyDatasetExists(dataset);

199

}

200

201

// Application undeployment - clean up usage tracking

202

System.out.println("Unregistering application: " + newApp);

203

usageRegistry.unregister(newApp);

204

205

// Verify cleanup

206

Set<DatasetId> remainingDatasets = usageRegistry.getDatasets(newApp);

207

assert remainingDatasets.isEmpty() : "Application still has registered datasets";

208

```

209

210

### Governance and Compliance Reporting

211

212

```java

213

// Generate compliance report for data usage

214

public void generateUsageReport(NamespaceId namespace) {

215

Map<String, List<String>> datasetUsage = new HashMap<>();

216

Map<String, List<String>> streamUsage = new HashMap<>();

217

218

// Get all applications in namespace

219

List<ApplicationId> applications = getApplicationsInNamespace(namespace);

220

221

for (ApplicationId app : applications) {

222

Set<DatasetId> datasets = usageRegistry.getDatasets(app);

223

Set<StreamId> streams = usageRegistry.getStreams(app);

224

225

for (DatasetId dataset : datasets) {

226

datasetUsage.computeIfAbsent(dataset.getDataset(), k -> new ArrayList<>())

227

.add(app.getApplication());

228

}

229

230

for (StreamId stream : streams) {

231

streamUsage.computeIfAbsent(stream.getStream(), k -> new ArrayList<>())

232

.add(app.getApplication());

233

}

234

}

235

236

System.out.println("=== Dataset Usage Report ===");

237

datasetUsage.forEach((dataset, apps) -> {

238

System.out.println(dataset + " used by: " + String.join(", ", apps));

239

});

240

241

System.out.println("\n=== Stream Usage Report ===");

242

streamUsage.forEach((stream, apps) -> {

243

System.out.println(stream + " consumed by: " + String.join(", ", apps));

244

});

245

}

246

247

// Find unused datasets for cleanup

248

public Set<DatasetId> findUnusedDatasets(NamespaceId namespace) {

249

Set<DatasetId> allDatasets = getAllDatasetsInNamespace(namespace);

250

Set<DatasetId> usedDatasets = new HashSet<>();

251

252

List<ApplicationId> applications = getApplicationsInNamespace(namespace);

253

for (ApplicationId app : applications) {

254

usedDatasets.addAll(usageRegistry.getDatasets(app));

255

}

256

257

Set<DatasetId> unusedDatasets = new HashSet<>(allDatasets);

258

unusedDatasets.removeAll(usedDatasets);

259

260

return unusedDatasets;

261

}

262

```

263

264

### Usage Writer for Real-time Tracking

265

266

```java

267

// Real-time usage tracking during program execution

268

public class ProgramExecutionTracker {

269

private final UsageWriter usageWriter;

270

271

public ProgramExecutionTracker(UsageWriter usageWriter) {

272

this.usageWriter = usageWriter;

273

}

274

275

public void trackDatasetAccess(ProgramId program, DatasetId dataset) {

276

// Register usage relationship in real-time

277

usageWriter.register(program, dataset);

278

279

// Log for audit purposes

280

System.out.println("Registered dataset access: " + program + " -> " + dataset);

281

}

282

283

public void trackStreamAccess(ProgramId program, StreamId stream) {

284

// Register stream usage relationship

285

usageWriter.register(program, stream);

286

287

// Log for audit purposes

288

System.out.println("Registered stream access: " + program + " -> " + stream);

289

}

290

}

291

292

// Usage in program runtime

293

ProgramExecutionTracker tracker = new ProgramExecutionTracker(usageRegistry);

294

ProgramId currentProgram = // ... get current program context

295

296

// Track dataset access as it happens

297

DatasetId dataset = NamespaceId.DEFAULT.dataset("userProfiles");

298

tracker.trackDatasetAccess(currentProgram, dataset);

299

300

// Track stream access

301

StreamId stream = NamespaceId.DEFAULT.stream("events");

302

tracker.trackStreamAccess(currentProgram, stream);

303

```

304

305

## Types

306

307

```java { .api }

308

// Core entity identifiers

309

public final class ApplicationId extends EntityId {

310

public static ApplicationId of(String namespace, String application);

311

public String getApplication();

312

public NamespaceId getParent();

313

314

// Program ID factory methods

315

public ProgramId mr(String program);

316

public ProgramId spark(String program);

317

public ProgramId service(String program);

318

public ProgramId worker(String program);

319

public ProgramId workflow(String program);

320

}

321

322

public final class ProgramId extends EntityId {

323

public static ProgramId of(String namespace, String application, ProgramType type, String program);

324

public String getProgram();

325

public ProgramType getType();

326

public ApplicationId getParent();

327

}

328

329

public final class DatasetId extends EntityId {

330

public static DatasetId of(String namespace, String dataset);

331

public String getDataset();

332

public NamespaceId getParent();

333

}

334

335

public final class StreamId extends EntityId {

336

public static StreamId of(String namespace, String stream);

337

public String getStream();

338

public NamespaceId getParent();

339

}

340

341

// Program types

342

public enum ProgramType {

343

MAPREDUCE("MapReduce"),

344

WORKFLOW("Workflow"),

345

SERVICE("Service"),

346

SPARK("Spark"),

347

WORKER("Worker");

348

349

private final String prettyName;

350

351

ProgramType(String prettyName) {

352

this.prettyName = prettyName;

353

}

354

355

public String getPrettyName() {

356

return prettyName;

357

}

358

}

359

360

// Usage data structures

361

public final class DatasetUsage {

362

public DatasetId getDatasetId();

363

public Set<ProgramId> getPrograms();

364

public long getCreationTime();

365

public long getLastAccessTime();

366

}

367

368

public final class DatasetUsageKey {

369

public DatasetId getDatasetId();

370

public ProgramId getProgramId();

371

public long getTimestamp();

372

373

public String getKey();

374

public static DatasetUsageKey of(DatasetId datasetId, ProgramId programId);

375

}

376

377

// Exception types

378

public class UsageException extends Exception {

379

public UsageException(String message);

380

public UsageException(String message, Throwable cause);

381

}

382

```