0
# Audit and Compliance
1
2
Comprehensive audit logging system for compliance, monitoring, and debugging with pluggable publishers and structured payload builders. The audit functionality provides essential capabilities for tracking all data operations, changes, and access patterns within the CDAP platform for regulatory compliance and operational monitoring.
3
4
## Capabilities
5
6
### Core Audit Publishing
7
8
The primary interface for publishing audit events with flexible payload support for different entity types.
9
10
```java { .api }
11
public interface AuditPublisher {
12
// Primary audit publishing methods
13
void publish(EntityId entityId, AuditType auditType, AuditPayload auditPayload);
14
void publish(MetadataEntity metadataEntity, AuditType auditType, AuditPayload auditPayload);
15
}
16
```
17
18
### Audit Publisher Implementations
19
20
Different implementations of the AuditPublisher for various deployment scenarios and storage requirements.
21
22
```java { .api }
23
// Standard audit publisher with configurable backends
24
public class DefaultAuditPublisher implements AuditPublisher {
25
// Full-featured audit publishing with persistent storage and messaging
26
}
27
28
// In-memory audit publisher for testing and development
29
public class InMemoryAuditPublisher implements AuditPublisher {
30
// Fast in-memory audit storage for testing scenarios
31
public List<AuditMessage> getAuditMessages();
32
public void clear();
33
}
34
35
// No-operation publisher for environments where auditing is disabled
36
public class NoOpAuditPublisher implements AuditPublisher {
37
// Null object pattern implementation for non-auditing environments
38
}
39
```
40
41
### Audit Payload Builders
42
43
Specialized builders for creating structured audit payloads for different types of operations.
44
45
```java { .api }
46
// Metadata-specific audit payload builder
47
public class MetadataPayloadBuilder {
48
public static AuditPayload buildForPropertyChange(Map<String, String> oldProperties,
49
Map<String, String> newProperties);
50
public static AuditPayload buildForTagChange(Set<String> oldTags, Set<String> newTags);
51
public static AuditPayload buildForMetadataChange(Metadata oldMetadata, Metadata newMetadata);
52
public static AuditPayload buildForSearch(SearchRequest searchRequest, SearchResults results);
53
}
54
55
// Dataset operation audit payload builder
56
public class DatasetPayloadBuilder {
57
public static AuditPayload buildForDatasetCreation(DatasetId datasetId, DatasetProperties properties);
58
public static AuditPayload buildForDatasetAccess(DatasetId datasetId, AccessType accessType);
59
public static AuditPayload buildForDatasetDeletion(DatasetId datasetId);
60
public static AuditPayload buildForDatasetUpdate(DatasetId datasetId, DatasetProperties oldProps,
61
DatasetProperties newProps);
62
}
63
64
// Program execution audit payload builder
65
public class ProgramPayloadBuilder {
66
public static AuditPayload buildForProgramStart(ProgramId programId, Map<String, String> arguments);
67
public static AuditPayload buildForProgramStop(ProgramId programId, ProgramStatus status);
68
public static AuditPayload buildForProgramSuspend(ProgramId programId);
69
public static AuditPayload buildForProgramResume(ProgramId programId);
70
}
71
```
72
73
### Audit Types and Categories
74
75
Enumeration of audit types covering all major operations and events in the CDAP platform.
76
77
```java { .api }
78
public enum AuditType {
79
// Entity lifecycle operations
80
CREATE,
81
UPDATE,
82
DELETE,
83
84
// Access operations
85
ACCESS,
86
READ,
87
WRITE,
88
89
// Metadata operations
90
METADATA_CHANGE,
91
TAG_CHANGE,
92
PROPERTY_CHANGE,
93
94
// Security operations
95
AUTHORIZATION_SUCCESS,
96
AUTHORIZATION_FAILURE,
97
AUTHENTICATION,
98
99
// Program operations
100
PROGRAM_START,
101
PROGRAM_STOP,
102
PROGRAM_SUSPEND,
103
PROGRAM_RESUME,
104
105
// Search operations
106
SEARCH,
107
108
// Administrative operations
109
NAMESPACE_CREATE,
110
NAMESPACE_DELETE,
111
NAMESPACE_UPDATE
112
}
113
```
114
115
## Usage Examples
116
117
### Basic Audit Publishing
118
119
```java
120
// Access audit publisher (typically injected)
121
AuditPublisher auditPublisher = // ... obtain instance
122
123
// Audit dataset creation
124
DatasetId datasetId = NamespaceId.DEFAULT.dataset("userProfiles");
125
DatasetProperties properties = DatasetProperties.builder()
126
.add("hbase.splits", "10")
127
.add("format", "avro")
128
.build();
129
130
AuditPayload creationPayload = DatasetPayloadBuilder.buildForDatasetCreation(datasetId, properties);
131
auditPublisher.publish(datasetId, AuditType.CREATE, creationPayload);
132
133
// Audit dataset access
134
AuditPayload accessPayload = DatasetPayloadBuilder.buildForDatasetAccess(datasetId, AccessType.READ_WRITE);
135
auditPublisher.publish(datasetId, AuditType.ACCESS, accessPayload);
136
137
// Audit metadata changes
138
MetadataEntity entity = MetadataEntity.ofDataset(NamespaceId.DEFAULT, "userProfiles");
139
Map<String, String> oldProperties = Map.of("environment", "dev");
140
Map<String, String> newProperties = Map.of("environment", "production", "owner", "team-alpha");
141
142
AuditPayload metadataPayload = MetadataPayloadBuilder.buildForPropertyChange(oldProperties, newProperties);
143
auditPublisher.publish(entity, AuditType.METADATA_CHANGE, metadataPayload);
144
```
145
146
### Program Execution Auditing
147
148
```java
149
// Audit program lifecycle events
150
public class ProgramAuditTracker {
151
private final AuditPublisher auditPublisher;
152
153
public ProgramAuditTracker(AuditPublisher auditPublisher) {
154
this.auditPublisher = auditPublisher;
155
}
156
157
public void auditProgramStart(ProgramId programId, Map<String, String> runtimeArgs) {
158
AuditPayload payload = ProgramPayloadBuilder.buildForProgramStart(programId, runtimeArgs);
159
auditPublisher.publish(programId, AuditType.PROGRAM_START, payload);
160
161
System.out.println("Audited program start: " + programId);
162
}
163
164
public void auditProgramStop(ProgramId programId, ProgramStatus finalStatus) {
165
AuditPayload payload = ProgramPayloadBuilder.buildForProgramStop(programId, finalStatus);
166
auditPublisher.publish(programId, AuditType.PROGRAM_STOP, payload);
167
168
System.out.println("Audited program stop: " + programId + " with status: " + finalStatus);
169
}
170
171
public void auditProgramSuspend(ProgramId programId) {
172
AuditPayload payload = ProgramPayloadBuilder.buildForProgramSuspend(programId);
173
auditPublisher.publish(programId, AuditType.PROGRAM_SUSPEND, payload);
174
}
175
176
public void auditProgramResume(ProgramId programId) {
177
AuditPayload payload = ProgramPayloadBuilder.buildForProgramResume(programId);
178
auditPublisher.publish(programId, AuditType.PROGRAM_RESUME, payload);
179
}
180
}
181
182
// Usage in program runtime
183
ProgramAuditTracker tracker = new ProgramAuditTracker(auditPublisher);
184
ProgramId programId = NamespaceId.DEFAULT.app("dataProcessor").mr("batchProcessor");
185
186
Map<String, String> args = Map.of(
187
"input.path", "/data/input/2023-06-20",
188
"output.path", "/data/output/2023-06-20",
189
"batch.size", "1000"
190
);
191
192
tracker.auditProgramStart(programId, args);
193
// ... program execution ...
194
tracker.auditProgramStop(programId, ProgramStatus.COMPLETED);
195
```
196
197
### Security and Authorization Auditing
198
199
```java
200
// Security audit helper
201
public class SecurityAuditHelper {
202
private final AuditPublisher auditPublisher;
203
204
public SecurityAuditHelper(AuditPublisher auditPublisher) {
205
this.auditPublisher = auditPublisher;
206
}
207
208
public void auditAuthorizationSuccess(EntityId entityId, String user, String action) {
209
AuditPayload payload = AuditPayload.builder()
210
.add("user", user)
211
.add("action", action)
212
.add("result", "SUCCESS")
213
.add("timestamp", String.valueOf(System.currentTimeMillis()))
214
.build();
215
216
auditPublisher.publish(entityId, AuditType.AUTHORIZATION_SUCCESS, payload);
217
}
218
219
public void auditAuthorizationFailure(EntityId entityId, String user, String action, String reason) {
220
AuditPayload payload = AuditPayload.builder()
221
.add("user", user)
222
.add("action", action)
223
.add("result", "FAILURE")
224
.add("reason", reason)
225
.add("timestamp", String.valueOf(System.currentTimeMillis()))
226
.build();
227
228
auditPublisher.publish(entityId, AuditType.AUTHORIZATION_FAILURE, payload);
229
}
230
231
public void auditAuthentication(String user, boolean success, String method) {
232
// Use system namespace for authentication events
233
EntityId systemEntity = NamespaceId.SYSTEM;
234
235
AuditPayload payload = AuditPayload.builder()
236
.add("user", user)
237
.add("method", method)
238
.add("success", String.valueOf(success))
239
.add("timestamp", String.valueOf(System.currentTimeMillis()))
240
.build();
241
242
auditPublisher.publish(systemEntity, AuditType.AUTHENTICATION, payload);
243
}
244
}
245
246
// Usage in security layer
247
SecurityAuditHelper securityAudit = new SecurityAuditHelper(auditPublisher);
248
DatasetId sensitiveDataset = NamespaceId.DEFAULT.dataset("customerPII");
249
250
// Audit successful authorization
251
securityAudit.auditAuthorizationSuccess(sensitiveDataset, "analyst@company.com", "READ");
252
253
// Audit failed authorization
254
securityAudit.auditAuthorizationFailure(sensitiveDataset, "intern@company.com", "WRITE",
255
"Insufficient privileges for PII data");
256
257
// Audit authentication events
258
securityAudit.auditAuthentication("admin@company.com", true, "KERBEROS");
259
```
260
261
### Metadata Change Auditing
262
263
```java
264
// Comprehensive metadata audit tracking
265
public class MetadataAuditTracker {
266
private final AuditPublisher auditPublisher;
267
268
public MetadataAuditTracker(AuditPublisher auditPublisher) {
269
this.auditPublisher = auditPublisher;
270
}
271
272
public void auditMetadataChange(MetadataEntity entity, Metadata oldMetadata, Metadata newMetadata) {
273
// Audit complete metadata change
274
AuditPayload completePayload = MetadataPayloadBuilder.buildForMetadataChange(oldMetadata, newMetadata);
275
auditPublisher.publish(entity, AuditType.METADATA_CHANGE, completePayload);
276
277
// Audit specific property changes
278
Map<String, String> oldProps = oldMetadata.getProperties();
279
Map<String, String> newProps = newMetadata.getProperties();
280
281
if (!oldProps.equals(newProps)) {
282
AuditPayload propPayload = MetadataPayloadBuilder.buildForPropertyChange(oldProps, newProps);
283
auditPublisher.publish(entity, AuditType.PROPERTY_CHANGE, propPayload);
284
}
285
286
// Audit specific tag changes
287
Set<String> oldTags = oldMetadata.getTags();
288
Set<String> newTags = newMetadata.getTags();
289
290
if (!oldTags.equals(newTags)) {
291
AuditPayload tagPayload = MetadataPayloadBuilder.buildForTagChange(oldTags, newTags);
292
auditPublisher.publish(entity, AuditType.TAG_CHANGE, tagPayload);
293
}
294
}
295
296
public void auditSearchOperation(SearchRequest request, SearchResults results) {
297
// Create payload for search auditing (for compliance tracking)
298
AuditPayload searchPayload = MetadataPayloadBuilder.buildForSearch(request, results);
299
300
// Use system namespace for search operations
301
MetadataEntity systemEntity = MetadataEntity.ofNamespace(NamespaceId.SYSTEM);
302
auditPublisher.publish(systemEntity, AuditType.SEARCH, searchPayload);
303
}
304
}
305
306
// Usage in metadata service
307
MetadataAuditTracker metadataAudit = new MetadataAuditTracker(auditPublisher);
308
309
Metadata oldMetadata = // ... get existing metadata
310
Metadata newMetadata = // ... get updated metadata
311
312
metadataAudit.auditMetadataChange(entity, oldMetadata, newMetadata);
313
```
314
315
### Audit Message Retrieval and Analysis
316
317
```java
318
// For testing and development environments using InMemoryAuditPublisher
319
public class AuditAnalyzer {
320
private final InMemoryAuditPublisher inMemoryPublisher;
321
322
public AuditAnalyzer(InMemoryAuditPublisher publisher) {
323
this.inMemoryPublisher = publisher;
324
}
325
326
public void analyzeAuditMessages() {
327
List<AuditMessage> messages = inMemoryPublisher.getAuditMessages();
328
329
// Analyze by audit type
330
Map<AuditType, Long> typeCount = messages.stream()
331
.collect(Collectors.groupingBy(AuditMessage::getAuditType, Collectors.counting()));
332
333
System.out.println("=== Audit Summary ===");
334
typeCount.forEach((type, count) ->
335
System.out.println(type + ": " + count + " events"));
336
337
// Analyze by entity type
338
Map<String, Long> entityCount = messages.stream()
339
.collect(Collectors.groupingBy(msg -> msg.getEntityId().getClass().getSimpleName(),
340
Collectors.counting()));
341
342
System.out.println("\n=== Entity Type Summary ===");
343
entityCount.forEach((entityType, count) ->
344
System.out.println(entityType + ": " + count + " events"));
345
346
// Find security-related events
347
List<AuditMessage> securityEvents = messages.stream()
348
.filter(msg -> msg.getAuditType() == AuditType.AUTHORIZATION_FAILURE ||
349
msg.getAuditType() == AuditType.AUTHENTICATION)
350
.collect(Collectors.toList());
351
352
System.out.println("\n=== Security Events ===");
353
securityEvents.forEach(event ->
354
System.out.println(event.getTimestamp() + ": " + event.getAuditType() +
355
" for " + event.getEntityId()));
356
}
357
358
public void clearAuditHistory() {
359
System.out.println("Clearing audit history (" +
360
inMemoryPublisher.getAuditMessages().size() + " messages)");
361
inMemoryPublisher.clear();
362
}
363
}
364
```
365
366
### Namespace Operation Auditing
367
368
```java
369
// Audit namespace operations
370
public void auditNamespaceOperations(NamespaceStore namespaceStore, AuditPublisher auditPublisher) {
371
372
// Audit namespace creation
373
NamespaceMeta namespace = NamespaceMeta.builder()
374
.setName("audit-test")
375
.setDescription("Test namespace for auditing")
376
.build();
377
378
AuditPayload creationPayload = AuditPayload.builder()
379
.add("operation", "create")
380
.add("namespace", namespace.getName())
381
.add("description", namespace.getDescription())
382
.add("creator", "admin-user")
383
.build();
384
385
try {
386
namespaceStore.create(namespace);
387
auditPublisher.publish(NamespaceId.of(namespace.getName()),
388
AuditType.NAMESPACE_CREATE, creationPayload);
389
} catch (NamespaceAlreadyExistsException e) {
390
// Audit the attempt even if it failed
391
auditPayload.add("error", e.getMessage());
392
auditPublisher.publish(NamespaceId.of(namespace.getName()),
393
AuditType.NAMESPACE_CREATE, creationPayload);
394
}
395
396
// Audit namespace deletion
397
NamespaceId namespaceToDelete = NamespaceId.of("audit-test");
398
AuditPayload deletionPayload = AuditPayload.builder()
399
.add("operation", "delete")
400
.add("namespace", namespaceToDelete.getNamespace())
401
.add("deletor", "admin-user")
402
.add("timestamp", String.valueOf(System.currentTimeMillis()))
403
.build();
404
405
try {
406
namespaceStore.delete(namespaceToDelete);
407
auditPublisher.publish(namespaceToDelete, AuditType.NAMESPACE_DELETE, deletionPayload);
408
} catch (NamespaceNotFoundException | NamespaceCannotBeDeletedException e) {
409
deletionPayload.add("error", e.getMessage());
410
auditPublisher.publish(namespaceToDelete, AuditType.NAMESPACE_DELETE, deletionPayload);
411
}
412
}
413
```
414
415
## Types
416
417
```java { .api }
418
// Audit payload structure
419
public final class AuditPayload {
420
public static Builder builder();
421
422
public Map<String, String> getPayload();
423
public String get(String key);
424
public boolean contains(String key);
425
426
public static class Builder {
427
public Builder add(String key, String value);
428
public Builder addAll(Map<String, String> properties);
429
public AuditPayload build();
430
}
431
}
432
433
// Audit message structure
434
public final class AuditMessage {
435
public EntityId getEntityId();
436
public AuditType getAuditType();
437
public AuditPayload getPayload();
438
public long getTimestamp();
439
public String getUser();
440
public String getVersion();
441
}
442
443
// Audit types enumeration
444
public enum AuditType {
445
CREATE("Entity creation"),
446
UPDATE("Entity update"),
447
DELETE("Entity deletion"),
448
ACCESS("Entity access"),
449
READ("Read operation"),
450
WRITE("Write operation"),
451
METADATA_CHANGE("Metadata change"),
452
TAG_CHANGE("Tag change"),
453
PROPERTY_CHANGE("Property change"),
454
AUTHORIZATION_SUCCESS("Authorization success"),
455
AUTHORIZATION_FAILURE("Authorization failure"),
456
AUTHENTICATION("Authentication event"),
457
PROGRAM_START("Program start"),
458
PROGRAM_STOP("Program stop"),
459
PROGRAM_SUSPEND("Program suspend"),
460
PROGRAM_RESUME("Program resume"),
461
SEARCH("Search operation"),
462
NAMESPACE_CREATE("Namespace creation"),
463
NAMESPACE_DELETE("Namespace deletion"),
464
NAMESPACE_UPDATE("Namespace update");
465
466
private final String description;
467
468
AuditType(String description) {
469
this.description = description;
470
}
471
472
public String getDescription() {
473
return description;
474
}
475
}
476
477
// Program status enumeration for audit tracking
478
public enum ProgramStatus {
479
PENDING,
480
STARTING,
481
RUNNING,
482
SUSPENDED,
483
RESUMING,
484
STOPPING,
485
STOPPED,
486
COMPLETED,
487
FAILED,
488
KILLED
489
}
490
491
// Exception types
492
public class AuditException extends Exception {
493
public AuditException(String message);
494
public AuditException(String message, Throwable cause);
495
}
496
497
public class AuditPublishException extends AuditException {
498
public AuditPublishException(String message);
499
public AuditPublishException(String message, Throwable cause);
500
}
501
```