or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

application-framework.mddata-management.mddata-processing.mdindex.mdoperational.mdplugin-system.mdsecurity-metadata.md

security-metadata.mddocs/

0

# Security & Metadata

1

2

CDAP provides comprehensive security and metadata management capabilities for enterprise data governance. These features enable secure storage of credentials, comprehensive metadata tracking, data lineage recording, and access control across all application components.

3

4

## Security Framework

5

6

### Secure Store

7

8

The Secure Store provides encrypted, centralized storage for sensitive configuration data such as passwords, API keys, and certificates.

9

10

```java { .api }

11

import io.cdap.cdap.api.security.store.*;

12

13

// Secure store interface for read access

14

@Beta

15

public interface SecureStore {

16

17

// List stored secrets

18

List<SecureStoreMetadata> list(String namespace) throws Exception;

19

20

// Retrieve secret data and metadata

21

SecureStoreData get(String namespace, String name) throws Exception;

22

23

// Retrieve only metadata (default implementation)

24

default SecureStoreMetadata getMetadata(String namespace, String name) throws Exception {

25

return get(namespace, name);

26

}

27

28

// Retrieve only the secret data

29

default byte[] getData(String namespace, String name) throws Exception {

30

return get(namespace, name).get();

31

}

32

}

33

34

// Secure store management interface

35

@Beta

36

public interface SecureStoreManager extends SecureStore {

37

38

// Store a secret

39

void put(String namespace, String name, String data, String description,

40

Map<String, String> properties) throws Exception;

41

42

// Delete a secret

43

void delete(String namespace, String name) throws Exception;

44

}

45

46

// Secure store data container

47

public class SecureStoreData {

48

public byte[] get() { /* returns secret data */ }

49

public String getName() { /* returns secret name */ }

50

public String getDescription() { /* returns description */ }

51

public long getCreationTimeMs() { /* returns creation timestamp */ }

52

public Map<String, String> getProperties() { /* returns properties */ }

53

}

54

55

// Secure store metadata

56

public class SecureStoreMetadata {

57

public String getName() { /* returns secret name */ }

58

public String getDescription() { /* returns description */ }

59

public long getCreationTimeMs() { /* returns creation timestamp */ }

60

public long getLastModifiedTimeMs() { /* returns last modified timestamp */ }

61

public Map<String, String> getProperties() { /* returns properties */ }

62

}

63

```

64

65

### Secure Store Usage Examples

66

67

```java { .api }

68

// Using secure store in applications

69

public class DatabaseConnectorApp extends AbstractApplication {

70

71

@Override

72

public void configure(ApplicationConfigurer configurer, ApplicationContext context) {

73

configurer.setName("DatabaseConnector");

74

configurer.setDescription("Connects to external databases securely");

75

76

// Add a MapReduce program that uses secure credentials

77

configurer.addMapReduce(new SecureDataExtractionMapReduce());

78

79

// Add a service that uses API keys from secure store

80

configurer.addService(new SecureAPIService());

81

}

82

}

83

84

// MapReduce program using secure store

85

public class SecureDataExtractionMapReduce extends AbstractMapReduce {

86

87

@Override

88

public void initialize(MapReduceContext context) throws Exception {

89

Job job = context.getHadoopJob();

90

91

// Retrieve database credentials from secure store

92

SecureStore secureStore = context.getAdmin();

93

SecureStoreData dbPassword = secureStore.get(context.getNamespace(), "db-password");

94

SecureStoreData dbUsername = secureStore.get(context.getNamespace(), "db-username");

95

SecureStoreData dbUrl = secureStore.get(context.getNamespace(), "db-connection-url");

96

97

// Configure database connection securely

98

Configuration conf = job.getConfiguration();

99

conf.set("db.url", new String(dbUrl.get()));

100

conf.set("db.username", new String(dbUsername.get()));

101

conf.set("db.password", new String(dbPassword.get()));

102

103

// Set input/output configuration

104

context.setInput(Input.ofDataset("source_data"));

105

context.setOutput(Output.ofDataset("extracted_data"));

106

107

job.setMapperClass(SecureDataMapper.class);

108

}

109

110

public static class SecureDataMapper extends Mapper<byte[], Row, byte[], Put> {

111

private Connection dbConnection;

112

113

@Override

114

protected void setup(Context context) throws IOException, InterruptedException {

115

Configuration conf = context.getConfiguration();

116

117

// Initialize secure database connection

118

try {

119

String url = conf.get("db.url");

120

String username = conf.get("db.username");

121

String password = conf.get("db.password");

122

123

dbConnection = DriverManager.getConnection(url, username, password);

124

} catch (SQLException e) {

125

throw new IOException("Failed to connect to database", e);

126

}

127

}

128

129

@Override

130

protected void map(byte[] key, Row row, Context context)

131

throws IOException, InterruptedException {

132

133

try {

134

// Perform secure database operations

135

String query = "SELECT enrichment_data FROM lookup_table WHERE id = ?";

136

try (PreparedStatement stmt = dbConnection.prepareStatement(query)) {

137

stmt.setString(1, row.getString("id"));

138

139

try (ResultSet rs = stmt.executeQuery()) {

140

if (rs.next()) {

141

Put put = new Put(key);

142

put.add("original", row.getColumns());

143

put.add("enriched_data", rs.getString("enrichment_data"));

144

put.add("enrichment_timestamp", System.currentTimeMillis());

145

146

context.write(key, put);

147

}

148

}

149

}

150

} catch (SQLException e) {

151

throw new IOException("Database operation failed", e);

152

}

153

}

154

155

@Override

156

protected void cleanup(Context context) throws IOException, InterruptedException {

157

if (dbConnection != null) {

158

try {

159

dbConnection.close();

160

} catch (SQLException e) {

161

// Log error but don't fail the job

162

LOG.warn("Failed to close database connection", e);

163

}

164

}

165

}

166

}

167

}

168

169

// Service using API keys from secure store

170

@Path("/api")

171

public class SecureAPIService extends AbstractHttpServiceHandler {

172

173

private String apiKey;

174

private String apiSecret;

175

176

@Override

177

public void initialize(HttpServiceContext context) throws Exception {

178

super.initialize(context);

179

180

// Retrieve API credentials from secure store

181

SecureStore secureStore = context.getAdmin();

182

apiKey = new String(secureStore.getData(context.getNamespace(), "external-api-key"));

183

apiSecret = new String(secureStore.getData(context.getNamespace(), "external-api-secret"));

184

}

185

186

@GET

187

@Path("/external-data/{id}")

188

public void fetchExternalData(HttpServiceRequest request, HttpServiceResponder responder,

189

@PathParam("id") String id) {

190

try {

191

// Make authenticated API call using secure credentials

192

String externalData = callExternalAPI(id, apiKey, apiSecret);

193

responder.sendJson(200, externalData);

194

} catch (Exception e) {

195

LOG.error("Failed to fetch external data for ID: {}", id, e);

196

responder.sendError(500, "Failed to fetch external data");

197

}

198

}

199

200

private String callExternalAPI(String id, String key, String secret) throws IOException {

201

// Implementation for secure external API calls

202

// Use HTTPS, proper authentication headers, etc.

203

return "{}"; // Placeholder

204

}

205

}

206

207

// Administrative operations for secure store management

208

public class SecureStoreManagementAction extends AbstractCustomAction {

209

210

@Override

211

public void run(CustomActionContext context) throws Exception {

212

SecureStoreManager storeManager = context.getAdmin();

213

String namespace = context.getNamespace();

214

215

// Setup database credentials during deployment

216

Map<String, String> dbProperties = new HashMap<>();

217

dbProperties.put("environment", "production");

218

dbProperties.put("database_type", "postgresql");

219

220

// Store encrypted credentials (would typically come from deployment config)

221

storeManager.put(namespace, "db-username", "prod_db_user",

222

"Production database username", dbProperties);

223

storeManager.put(namespace, "db-password", "secure_password_123",

224

"Production database password", dbProperties);

225

storeManager.put(namespace, "db-connection-url",

226

"jdbc:postgresql://prod-db:5432/main",

227

"Production database URL", dbProperties);

228

229

// Setup API credentials

230

Map<String, String> apiProperties = new HashMap<>();

231

apiProperties.put("service", "external-analytics");

232

apiProperties.put("access_level", "read");

233

234

storeManager.put(namespace, "external-api-key", "api_key_xyz789",

235

"External analytics API key", apiProperties);

236

storeManager.put(namespace, "external-api-secret", "secret_abc456",

237

"External analytics API secret", apiProperties);

238

239

context.getMetrics().count("secure_store.secrets_configured", 5);

240

}

241

}

242

```

243

244

### Access Control

245

246

```java { .api }

247

import io.cdap.cdap.api.security.AccessException;

248

249

// Access exception for security violations

250

public class AccessException extends Exception {

251

public AccessException(String message) { super(message); }

252

public AccessException(String message, Throwable cause) { super(message, cause); }

253

}

254

255

// Security context and access patterns

256

public class SecurityAwareService extends AbstractHttpServiceHandler {

257

258

@GET

259

@Path("/secure-data")

260

public void getSecureData(HttpServiceRequest request, HttpServiceResponder responder) {

261

try {

262

// Validate user permissions

263

validateUserAccess(request);

264

265

// Access secure data

266

String userId = request.getHeader("X-User-ID");

267

Table userDataTable = getContext().getDataset("user_secure_data");

268

269

Row userData = userDataTable.get(Bytes.toBytes(userId));

270

if (userData.isEmpty()) {

271

responder.sendError(404, "User data not found");

272

return;

273

}

274

275

// Filter sensitive fields based on user role

276

JsonObject response = filterSensitiveData(userData, getUserRole(userId));

277

responder.sendJson(200, response);

278

279

} catch (AccessException e) {

280

responder.sendError(403, "Access denied: " + e.getMessage());

281

} catch (Exception e) {

282

LOG.error("Error accessing secure data", e);

283

responder.sendError(500, "Internal error");

284

}

285

}

286

287

private void validateUserAccess(HttpServiceRequest request) throws AccessException {

288

String authToken = request.getHeader("Authorization");

289

if (authToken == null || !isValidToken(authToken)) {

290

throw new AccessException("Invalid or missing authentication token");

291

}

292

293

String userRole = getUserRoleFromToken(authToken);

294

if (!hasDataAccessPermission(userRole)) {

295

throw new AccessException("Insufficient permissions for data access");

296

}

297

}

298

299

private JsonObject filterSensitiveData(Row userData, String userRole) {

300

JsonObject filtered = new JsonObject();

301

302

// Always include basic info

303

filtered.addProperty("id", userData.getString("id"));

304

filtered.addProperty("name", userData.getString("name"));

305

306

// Include sensitive data only for privileged roles

307

if ("admin".equals(userRole) || "data_analyst".equals(userRole)) {

308

filtered.addProperty("ssn", userData.getString("ssn"));

309

filtered.addProperty("salary", userData.getLong("salary"));

310

}

311

312

// Include PII only for admin role

313

if ("admin".equals(userRole)) {

314

filtered.addProperty("address", userData.getString("address"));

315

filtered.addProperty("phone", userData.getString("phone"));

316

}

317

318

return filtered;

319

}

320

321

private boolean isValidToken(String token) {

322

// Implementation for token validation

323

return token.startsWith("Bearer ") && token.length() > 50;

324

}

325

326

private String getUserRoleFromToken(String token) {

327

// Implementation for extracting user role from token

328

return "user"; // Default role

329

}

330

331

private String getUserRole(String userId) {

332

// Implementation for getting user role from user ID

333

return "user"; // Default role

334

}

335

336

private boolean hasDataAccessPermission(String role) {

337

return Arrays.asList("admin", "data_analyst", "user").contains(role);

338

}

339

}

340

```

341

342

## Metadata Management

343

344

### Metadata Framework

345

346

CDAP provides comprehensive metadata management for tracking data schema, properties, tags, and relationships across all application components.

347

348

```java { .api }

349

import io.cdap.cdap.api.metadata.*;

350

351

// Metadata reader interface

352

@Beta

353

public interface MetadataReader {

354

355

// Get all metadata for an entity

356

Map<MetadataScope, Metadata> getMetadata(MetadataEntity metadataEntity) throws MetadataException;

357

358

// Get metadata for specific scope

359

Metadata getMetadata(MetadataScope scope, MetadataEntity metadataEntity) throws MetadataException;

360

}

361

362

// Metadata writer interface

363

@Beta

364

public interface MetadataWriter {

365

366

// Add properties to an entity

367

void addProperties(MetadataEntity metadataEntity, Map<String, String> properties);

368

369

// Add tags to an entity

370

void addTags(MetadataEntity metadataEntity, String... tags);

371

void addTags(MetadataEntity metadataEntity, Iterable<String> tags);

372

373

// Remove properties from an entity

374

void removeProperties(MetadataEntity metadataEntity, String... keys);

375

376

// Remove tags from an entity

377

void removeTags(MetadataEntity metadataEntity, String... tags);

378

379

// Remove all metadata for an entity

380

void removeMetadata(MetadataEntity metadataEntity);

381

}

382

383

// Metadata container

384

public class Metadata {

385

public static final Metadata EMPTY = new Metadata(Collections.emptyMap(), Collections.emptySet());

386

387

public Metadata(Map<String, String> properties, Set<String> tags) { /* constructor */ }

388

389

public Map<String, String> getProperties() { /* returns properties */ }

390

public Set<String> getTags() { /* returns tags */ }

391

public boolean isEmpty() { /* returns if metadata is empty */ }

392

}

393

394

// Metadata entity identification

395

public class MetadataEntity {

396

public static Builder builder() { return new Builder(); }

397

398

public String getType() { /* returns entity type */ }

399

public List<String> getValue() { /* returns entity value components */ }

400

401

public static class Builder {

402

public Builder append(String type, String value) { /* append component */ return this; }

403

public Builder appendAsType(String value) { /* append as type */ return this; }

404

public MetadataEntity build() { /* build entity */ }

405

}

406

}

407

408

// Metadata scopes

409

public enum MetadataScope {

410

USER, // User-defined metadata

411

SYSTEM // System-generated metadata

412

}

413

414

// Metadata exception

415

public class MetadataException extends Exception {

416

public MetadataException(String message) { super(message); }

417

public MetadataException(String message, Throwable cause) { super(message, cause); }

418

}

419

```

420

421

### Metadata Usage Examples

422

423

```java { .api }

424

// Metadata management in applications

425

public class DataGovernanceApp extends AbstractApplication {

426

427

@Override

428

public void configure(ApplicationConfigurer configurer, ApplicationContext context) {

429

configurer.setName("DataGovernanceApp");

430

configurer.setDescription("Manages data lineage and metadata");

431

432

// Add program for metadata enrichment

433

configurer.addMapReduce(new MetadataEnrichmentMapReduce());

434

435

// Add service for metadata queries

436

configurer.addService(new MetadataQueryService());

437

438

// Add workflow for periodic metadata updates

439

configurer.addWorkflow(new MetadataMaintenanceWorkflow());

440

}

441

}

442

443

// MapReduce program that enriches data with metadata

444

public class MetadataEnrichmentMapReduce extends AbstractMapReduce {

445

446

@Override

447

public void initialize(MapReduceContext context) throws Exception {

448

Job job = context.getHadoopJob();

449

450

context.setInput(Input.ofDataset("raw_customer_data"));

451

context.setOutput(Output.ofDataset("enriched_customer_data"));

452

453

job.setMapperClass(MetadataEnrichmentMapper.class);

454

job.setReducerClass(MetadataAggregationReducer.class);

455

456

// Add metadata about this processing job

457

MetadataWriter metadataWriter = context.getMetadataWriter();

458

MetadataEntity jobEntity = MetadataEntity.builder()

459

.appendAsType("program")

460

.append("application", context.getApplicationSpecification().getName())

461

.append("program", context.getSpecification().getName())

462

.build();

463

464

// Tag the processing job

465

metadataWriter.addTags(jobEntity, "customer-processing", "batch-enrichment", "daily");

466

467

// Add properties

468

Map<String, String> jobProperties = new HashMap<>();

469

jobProperties.put("data_source", "customer_database");

470

jobProperties.put("processing_type", "enrichment");

471

jobProperties.put("schedule", "daily");

472

jobProperties.put("owner", "data-team");

473

metadataWriter.addProperties(jobEntity, jobProperties);

474

}

475

476

public static class MetadataEnrichmentMapper extends Mapper<byte[], Row, Text, CustomerRecord> {

477

478

private MetadataWriter metadataWriter;

479

480

@Override

481

protected void setup(Context context) throws IOException, InterruptedException {

482

// Access metadata writer from context

483

metadataWriter = ((MapReduceTaskContext<?>) context).getMetadataWriter();

484

}

485

486

@Override

487

protected void map(byte[] key, Row row, Context context)

488

throws IOException, InterruptedException {

489

490

String customerId = row.getString("customer_id");

491

CustomerRecord customer = new CustomerRecord();

492

customer.setId(customerId);

493

customer.setName(row.getString("name"));

494

customer.setEmail(row.getString("email"));

495

customer.setRegistrationDate(row.getLong("registration_date"));

496

497

// Enrich with metadata

498

enrichCustomerWithMetadata(customer);

499

500

// Track data lineage for the customer record

501

trackCustomerDataLineage(customerId, customer);

502

503

context.write(new Text(customerId), customer);

504

}

505

506

private void enrichCustomerWithMetadata(CustomerRecord customer) {

507

// Add computed metadata

508

long daysSinceRegistration =

509

(System.currentTimeMillis() - customer.getRegistrationDate()) / (24 * 60 * 60 * 1000);

510

customer.setDaysSinceRegistration(daysSinceRegistration);

511

512

// Classify customer based on data

513

String customerTier = classifyCustomerTier(customer);

514

customer.setTier(customerTier);

515

516

// Add data quality score

517

double qualityScore = calculateDataQuality(customer);

518

customer.setDataQualityScore(qualityScore);

519

}

520

521

private void trackCustomerDataLineage(String customerId, CustomerRecord customer) {

522

// Create metadata entity for this customer record

523

MetadataEntity customerEntity = MetadataEntity.builder()

524

.appendAsType("customer")

525

.append("id", customerId)

526

.build();

527

528

// Add metadata tags

529

metadataWriter.addTags(customerEntity, "customer-data", "pii", customer.getTier());

530

531

// Add metadata properties

532

Map<String, String> properties = new HashMap<>();

533

properties.put("data_classification", containsPII(customer) ? "sensitive" : "public");

534

properties.put("last_processed", String.valueOf(System.currentTimeMillis()));

535

properties.put("processing_job", "MetadataEnrichmentMapReduce");

536

properties.put("data_quality_score", String.valueOf(customer.getDataQualityScore()));

537

properties.put("customer_tier", customer.getTier());

538

539

metadataWriter.addProperties(customerEntity, properties);

540

}

541

542

private String classifyCustomerTier(CustomerRecord customer) {

543

// Logic to classify customer tier

544

return customer.getDaysSinceRegistration() > 365 ? "gold" : "silver";

545

}

546

547

private double calculateDataQuality(CustomerRecord customer) {

548

// Logic to calculate data quality score

549

double score = 1.0;

550

if (customer.getName() == null || customer.getName().isEmpty()) score -= 0.3;

551

if (customer.getEmail() == null || !isValidEmail(customer.getEmail())) score -= 0.4;

552

return Math.max(0.0, score);

553

}

554

555

private boolean containsPII(CustomerRecord customer) {

556

return customer.getEmail() != null || customer.getName() != null;

557

}

558

559

private boolean isValidEmail(String email) {

560

return email.contains("@") && email.contains(".");

561

}

562

}

563

}

564

565

// Service for querying metadata

566

@Path("/metadata")

567

public class MetadataQueryService extends AbstractHttpServiceHandler {

568

569

@GET

570

@Path("/entity/{type}/{id}")

571

public void getEntityMetadata(HttpServiceRequest request, HttpServiceResponder responder,

572

@PathParam("type") String entityType,

573

@PathParam("id") String entityId) {

574

try {

575

MetadataReader metadataReader = getContext().getMetadataReader();

576

577

MetadataEntity entity = MetadataEntity.builder()

578

.appendAsType(entityType)

579

.append("id", entityId)

580

.build();

581

582

Map<MetadataScope, Metadata> allMetadata = metadataReader.getMetadata(entity);

583

584

JsonObject response = new JsonObject();

585

for (Map.Entry<MetadataScope, Metadata> entry : allMetadata.entrySet()) {

586

JsonObject scopeMetadata = new JsonObject();

587

588

// Add properties

589

JsonObject properties = new JsonObject();

590

for (Map.Entry<String, String> prop : entry.getValue().getProperties().entrySet()) {

591

properties.addProperty(prop.getKey(), prop.getValue());

592

}

593

scopeMetadata.add("properties", properties);

594

595

// Add tags

596

JsonArray tags = new JsonArray();

597

for (String tag : entry.getValue().getTags()) {

598

tags.add(tag);

599

}

600

scopeMetadata.add("tags", tags);

601

602

response.add(entry.getKey().name().toLowerCase(), scopeMetadata);

603

}

604

605

responder.sendJson(200, response);

606

607

} catch (MetadataException e) {

608

responder.sendError(500, "Metadata query failed: " + e.getMessage());

609

} catch (Exception e) {

610

responder.sendError(500, "Internal error: " + e.getMessage());

611

}

612

}

613

614

@POST

615

@Path("/entity/{type}/{id}/tags")

616

public void addEntityTags(HttpServiceRequest request, HttpServiceResponder responder,

617

@PathParam("type") String entityType,

618

@PathParam("id") String entityId) {

619

try {

620

String content = Charset.forName("UTF-8").decode(

621

ByteBuffer.wrap(request.getContent())).toString();

622

JsonObject requestJson = new JsonParser().parse(content).getAsJsonObject();

623

624

JsonArray tagsArray = requestJson.getAsJsonArray("tags");

625

String[] tags = new String[tagsArray.size()];

626

for (int i = 0; i < tagsArray.size(); i++) {

627

tags[i] = tagsArray.get(i).getAsString();

628

}

629

630

MetadataWriter metadataWriter = getContext().getMetadataWriter();

631

MetadataEntity entity = MetadataEntity.builder()

632

.appendAsType(entityType)

633

.append("id", entityId)

634

.build();

635

636

metadataWriter.addTags(entity, tags);

637

638

responder.sendString(200, "Tags added successfully", "text/plain");

639

640

} catch (Exception e) {

641

responder.sendError(500, "Failed to add tags: " + e.getMessage());

642

}

643

}

644

}

645

```

646

647

## Data Lineage

648

649

Data lineage tracking provides comprehensive visibility into data flow, transformations, and dependencies across the entire data processing pipeline.

650

651

### Lineage Recording

652

653

```java { .api }

654

import io.cdap.cdap.api.lineage.field.*;

655

656

// Lineage recorder interface

657

public interface LineageRecorder {

658

659

// Record lineage operations

660

void record(Collection<? extends Operation> operations);

661

662

// Flush recorded lineage (automatic for batch programs, manual for streaming)

663

void flushLineage() throws IllegalArgumentException;

664

}

665

666

// Base operation for lineage tracking

667

public abstract class Operation {

668

public Operation(String name, String description, OperationType type,

669

List<? extends EndPoint> inputs, List<? extends EndPoint> outputs) {

670

/* constructor */

671

}

672

673

public String getName() { /* returns operation name */ }

674

public String getDescription() { /* returns operation description */ }

675

public OperationType getType() { /* returns operation type */ }

676

public List<EndPoint> getInputs() { /* returns input endpoints */ }

677

public List<EndPoint> getOutputs() { /* returns output endpoints */ }

678

}

679

680

// Operation types

681

public enum OperationType {

682

READ, // Read operation from external source

683

write, // Write operation to external sink

684

transform // Transformation operation

685

}

686

687

// Specific operation types

688

public class ReadOperation extends Operation {

689

public ReadOperation(String name, String description, EndPoint source,

690

String... fields) { /* constructor */ }

691

}

692

693

public class WriteOperation extends Operation {

694

public WriteOperation(String name, String description, EndPoint sink,

695

InputField... inputs) { /* constructor */ }

696

}

697

698

public class TransformOperation extends Operation {

699

public TransformOperation(String name, String description,

700

List<InputField> inputs, String... outputs) { /* constructor */ }

701

}

702

703

// Lineage endpoints

704

public class EndPoint {

705

public static EndPoint of(String namespace, String name) { /* create endpoint */ }

706

public static EndPoint of(String namespace, String name, Map<String, String> properties) { /* create with props */ }

707

708

public String getNamespace() { /* returns namespace */ }

709

public String getName() { /* returns name */ }

710

public Map<String, String> getProperties() { /* returns properties */ }

711

}

712

713

// Input field for lineage tracking

714

public class InputField {

715

public static InputField of(String operationName, String fieldName) { /* create input field */ }

716

717

public String getOrigin() { /* returns origin operation */ }

718

public String getField() { /* returns field name */ }

719

}

720

```

721

722

### Lineage Implementation Examples

723

724

```java { .api }

725

// MapReduce program with comprehensive lineage tracking

726

public class CustomerAnalyticsMapReduce extends AbstractMapReduce {

727

728

@Override

729

public void initialize(MapReduceContext context) throws Exception {

730

Job job = context.getHadoopJob();

731

732

context.setInput(Input.ofDataset("customer_profiles"));

733

context.addInput(Input.ofDataset("purchase_history"));

734

context.addInput(Input.ofDataset("support_tickets"));

735

736

context.setOutput(Output.ofDataset("customer_analytics"));

737

738

job.setMapperClass(CustomerAnalyticsMapper.class);

739

job.setReducerClass(CustomerAnalyticsReducer.class);

740

741

// Record lineage for this MapReduce job

742

recordJobLineage(context);

743

}

744

745

private void recordJobLineage(MapReduceContext context) {

746

LineageRecorder lineageRecorder = context;

747

748

// Define input endpoints

749

EndPoint customerProfilesEP = EndPoint.of(context.getNamespace(), "customer_profiles");

750

EndPoint purchaseHistoryEP = EndPoint.of(context.getNamespace(), "purchase_history");

751

EndPoint supportTicketsEP = EndPoint.of(context.getNamespace(), "support_tickets");

752

753

// Define output endpoint

754

EndPoint customerAnalyticsEP = EndPoint.of(context.getNamespace(), "customer_analytics");

755

756

List<Operation> operations = new ArrayList<>();

757

758

// Record read operations

759

operations.add(new ReadOperation("read_customer_profiles",

760

"Read customer profile data", customerProfilesEP,

761

"customer_id", "name", "email", "registration_date", "tier"));

762

763

operations.add(new ReadOperation("read_purchase_history",

764

"Read customer purchase history", purchaseHistoryEP,

765

"customer_id", "purchase_date", "amount", "product_category"));

766

767

operations.add(new ReadOperation("read_support_tickets",

768

"Read customer support interactions", supportTicketsEP,

769

"customer_id", "ticket_date", "issue_type", "resolution_time"));

770

771

// Record transformation operations

772

operations.add(new TransformOperation("calculate_purchase_metrics",

773

"Calculate purchase-related metrics per customer",

774

Arrays.asList(

775

InputField.of("read_purchase_history", "customer_id"),

776

InputField.of("read_purchase_history", "amount"),

777

InputField.of("read_purchase_history", "purchase_date")

778

),

779

"total_spent", "avg_order_value", "purchase_frequency", "last_purchase_date"

780

));

781

782

operations.add(new TransformOperation("calculate_support_metrics",

783

"Calculate support-related metrics per customer",

784

Arrays.asList(

785

InputField.of("read_support_tickets", "customer_id"),

786

InputField.of("read_support_tickets", "ticket_date"),

787

InputField.of("read_support_tickets", "resolution_time")

788

),

789

"total_tickets", "avg_resolution_time", "last_ticket_date"

790

));

791

792

operations.add(new TransformOperation("merge_customer_data",

793

"Merge profile, purchase, and support data per customer",

794

Arrays.asList(

795

InputField.of("read_customer_profiles", "customer_id"),

796

InputField.of("read_customer_profiles", "name"),

797

InputField.of("read_customer_profiles", "email"),

798

InputField.of("read_customer_profiles", "tier"),

799

InputField.of("calculate_purchase_metrics", "total_spent"),

800

InputField.of("calculate_purchase_metrics", "avg_order_value"),

801

InputField.of("calculate_purchase_metrics", "purchase_frequency"),

802

InputField.of("calculate_support_metrics", "total_tickets"),

803

InputField.of("calculate_support_metrics", "avg_resolution_time")

804

),

805

"customer_id", "name", "email", "tier", "total_spent", "avg_order_value",

806

"purchase_frequency", "total_tickets", "avg_resolution_time", "customer_score"

807

));

808

809

// Record write operation

810

operations.add(new WriteOperation("write_customer_analytics",

811

"Write aggregated customer analytics", customerAnalyticsEP,

812

InputField.of("merge_customer_data", "customer_id"),

813

InputField.of("merge_customer_data", "name"),

814

InputField.of("merge_customer_data", "email"),

815

InputField.of("merge_customer_data", "tier"),

816

InputField.of("merge_customer_data", "total_spent"),

817

InputField.of("merge_customer_data", "avg_order_value"),

818

InputField.of("merge_customer_data", "purchase_frequency"),

819

InputField.of("merge_customer_data", "total_tickets"),

820

InputField.of("merge_customer_data", "avg_resolution_time"),

821

InputField.of("merge_customer_data", "customer_score")

822

));

823

824

// Record all operations for lineage

825

lineageRecorder.record(operations);

826

}

827

828

// Mapper and Reducer implementations would process the actual data

829

// while the lineage operations describe the data flow and transformations

830

}

831

832

// Spark program with streaming lineage tracking

833

public class RealTimeRecommendationSpark extends AbstractSpark {

834

835

@Override

836

public void run(SparkClientContext context) throws Exception {

837

SparkSession spark = context.getSparkSession();

838

LineageRecorder lineageRecorder = context;

839

840

// Read streaming data

841

Dataset<Row> userEvents = spark

842

.readStream()

843

.format("kafka")

844

.option("kafka.bootstrap.servers", "localhost:9092")

845

.option("subscribe", "user-events")

846

.load();

847

848

// Read static user profiles

849

JavaPairRDD<byte[], Row> profilesRDD = context.fromDataset("user_profiles");

850

Dataset<Row> userProfiles = spark.createDataFrame(profilesRDD.map(Tuple2::_2).rdd(), getProfileSchema());

851

852

// Process and generate recommendations

853

Dataset<Row> recommendations = userEvents

854

.join(userProfiles, "user_id")

855

.groupBy("user_id", "category")

856

.agg(count("event").as("event_count"))

857

.withColumn("recommendation_score", expr("event_count * 0.8"))

858

.filter("recommendation_score > 5");

859

860

// Record lineage for streaming operations

861

recordStreamingLineage(lineageRecorder, context.getNamespace());

862

863

// Write recommendations and flush lineage periodically

864

StreamingQuery query = recommendations

865

.writeStream()

866

.outputMode("update")

867

.foreachBatch((Dataset<Row> batchDF, Long batchId) -> {

868

// Convert and save to dataset

869

JavaPairRDD<byte[], Put> outputRDD = batchDF.javaRDD().mapToPair(row -> {

870

String key = row.getAs("user_id") + "_" + row.getAs("category");

871

Put put = new Put(Bytes.toBytes(key));

872

put.add("rec", "user_id", row.getAs("user_id"));

873

put.add("rec", "category", row.getAs("category"));

874

put.add("rec", "score", row.getAs("recommendation_score"));

875

put.add("rec", "batch_id", batchId);

876

return new Tuple2<>(Bytes.toBytes(key), put);

877

});

878

879

context.saveAsDataset(outputRDD, "user_recommendations");

880

881

// Manually flush lineage for streaming program

882

if (batchId % 10 == 0) { // Flush every 10 batches

883

lineageRecorder.flushLineage();

884

}

885

})

886

.start();

887

888

query.awaitTermination();

889

}

890

891

private void recordStreamingLineage(LineageRecorder recorder, String namespace) {

892

List<Operation> operations = new ArrayList<>();

893

894

// Input endpoints

895

Map<String, String> kafkaProps = new HashMap<>();

896

kafkaProps.put("topic", "user-events");

897

kafkaProps.put("format", "json");

898

EndPoint userEventsEP = EndPoint.of("kafka", "user-events", kafkaProps);

899

EndPoint userProfilesEP = EndPoint.of(namespace, "user_profiles");

900

EndPoint recommendationsEP = EndPoint.of(namespace, "user_recommendations");

901

902

// Read operations

903

operations.add(new ReadOperation("read_user_events",

904

"Stream user events from Kafka", userEventsEP,

905

"user_id", "event_type", "category", "timestamp"));

906

907

operations.add(new ReadOperation("read_user_profiles",

908

"Read static user profile data", userProfilesEP,

909

"user_id", "preferences", "demographics"));

910

911

// Transform operations

912

operations.add(new TransformOperation("join_events_profiles",

913

"Join streaming events with user profiles",

914

Arrays.asList(

915

InputField.of("read_user_events", "user_id"),

916

InputField.of("read_user_events", "event_type"),

917

InputField.of("read_user_events", "category"),

918

InputField.of("read_user_profiles", "user_id"),

919

InputField.of("read_user_profiles", "preferences")

920

),

921

"user_id", "event_type", "category", "preferences"

922

));

923

924

operations.add(new TransformOperation("calculate_recommendations",

925

"Calculate recommendation scores based on event patterns",

926

Arrays.asList(

927

InputField.of("join_events_profiles", "user_id"),

928

InputField.of("join_events_profiles", "category"),

929

InputField.of("join_events_profiles", "event_type")

930

),

931

"user_id", "category", "recommendation_score"

932

));

933

934

// Write operation

935

operations.add(new WriteOperation("write_recommendations",

936

"Write real-time recommendations", recommendationsEP,

937

InputField.of("calculate_recommendations", "user_id"),

938

InputField.of("calculate_recommendations", "category"),

939

InputField.of("calculate_recommendations", "recommendation_score")

940

));

941

942

recorder.record(operations);

943

}

944

}

945

```

946

947

The Security & Metadata framework in CDAP enables comprehensive data governance with enterprise-grade security features, detailed metadata tracking, and complete data lineage visibility across all data processing operations.