or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

actions-conditions.mdbatch-processing.mdcore-pipeline.mddata-connectors.mdindex.mdjoin-operations.mdlineage-metadata.mdsql-engine.mdvalidation.md

lineage-metadata.mddocs/

0

# Lineage and Metadata

1

2

Data lineage tracking and field-level transformations for governance, debugging, and compliance in CDAP ETL pipelines.

3

4

## Core Lineage Concepts

5

6

### AccessType

7

8

Enumeration of data access types for lineage tracking.

9

10

```java { .api }

11

package io.cdap.cdap.etl.api.lineage;

12

13

public enum AccessType {

14

READ, // Data read operation

15

WRITE, // Data write operation

16

UNKNOWN // Unknown access type

17

}

18

```

19

20

## Field-Level Lineage

21

22

### LineageRecorder

23

24

Interface for recording field-level lineage information during pipeline execution.

25

26

```java { .api }

27

package io.cdap.cdap.etl.api.lineage.field;

28

29

public interface LineageRecorder {

30

/**

31

* Record field operations for lineage tracking.

32

*/

33

void record(List<FieldOperation> fieldOperations);

34

}

35

```

36

37

**Lineage Recording Example:**

38

```java

39

@Plugin(type = Transform.PLUGIN_TYPE)

40

@Name("CustomerDataEnricher")

41

public class CustomerDataEnricher extends Transform<StructuredRecord, StructuredRecord> {

42

43

private final Config config;

44

45

@Override

46

public void transform(StructuredRecord input, Emitter<StructuredRecord> emitter) throws Exception {

47

TransformContext context = getContext();

48

LineageRecorder lineageRecorder = context.getLineageRecorder();

49

50

// Build enriched customer record

51

StructuredRecord.Builder builder = StructuredRecord.builder(context.getOutputSchema());

52

53

// Direct field mappings

54

builder.set("customer_id", input.get("id"));

55

builder.set("first_name", input.get("fname"));

56

builder.set("last_name", input.get("lname"));

57

58

// Derived fields

59

String fullName = input.get("fname") + " " + input.get("lname");

60

builder.set("full_name", fullName);

61

62

String email = generateEmail(input.get("fname"), input.get("lname"));

63

builder.set("email", email);

64

65

// Lookup enrichment

66

String customerId = input.get("id");

67

CustomerProfile profile = lookupCustomerProfile(customerId);

68

if (profile != null) {

69

builder.set("segment", profile.getSegment());

70

builder.set("lifetime_value", profile.getLifetimeValue());

71

}

72

73

// Record field lineage

74

List<FieldOperation> operations = Arrays.asList(

75

// Direct field reads

76

new FieldReadOperation("read_customer_id",

77

"Read customer ID from source",

78

Arrays.asList("id")),

79

new FieldReadOperation("read_names",

80

"Read customer names from source",

81

Arrays.asList("fname", "lname")),

82

83

// Field transformations

84

new FieldTransformOperation("map_customer_id",

85

"Map source ID to customer_id",

86

Arrays.asList("id"),

87

Arrays.asList("customer_id")),

88

new FieldTransformOperation("map_first_name",

89

"Map fname to first_name",

90

Arrays.asList("fname"),

91

Arrays.asList("first_name")),

92

new FieldTransformOperation("map_last_name",

93

"Map lname to last_name",

94

Arrays.asList("lname"),

95

Arrays.asList("last_name")),

96

97

// Derived field operations

98

new FieldTransformOperation("derive_full_name",

99

"Concatenate first and last name",

100

Arrays.asList("fname", "lname"),

101

Arrays.asList("full_name")),

102

new FieldTransformOperation("generate_email",

103

"Generate email from names",

104

Arrays.asList("fname", "lname"),

105

Arrays.asList("email")),

106

107

// Lookup operations

108

new FieldTransformOperation("lookup_segment",

109

"Lookup customer segment",

110

Arrays.asList("id"),

111

Arrays.asList("segment")),

112

new FieldTransformOperation("lookup_lifetime_value",

113

"Lookup customer lifetime value",

114

Arrays.asList("id"),

115

Arrays.asList("lifetime_value")),

116

117

// Final write operation

118

new FieldWriteOperation("write_enriched_customer",

119

"Write enriched customer record",

120

Arrays.asList("customer_id", "first_name", "last_name",

121

"full_name", "email", "segment", "lifetime_value"))

122

);

123

124

lineageRecorder.record(operations);

125

126

emitter.emit(builder.build());

127

}

128

}

129

```

130

131

### FieldOperation

132

133

Base class for field operations in lineage tracking.

134

135

```java { .api }

136

package io.cdap.cdap.etl.api.lineage.field;

137

138

public class FieldOperation {

139

/**

140

* Create field operation.

141

*/

142

public FieldOperation(String name, String description, OperationType type,

143

List<String> inputs, List<String> outputs) {}

144

145

/**

146

* Get operation name.

147

*/

148

public String getName() {}

149

150

/**

151

* Get operation description.

152

*/

153

public String getDescription() {}

154

155

/**

156

* Get operation type.

157

*/

158

public OperationType getType() {}

159

160

/**

161

* Get input field names.

162

*/

163

public List<String> getInputs() {}

164

165

/**

166

* Get output field names.

167

*/

168

public List<String> getOutputs() {}

169

}

170

```

171

172

### FieldReadOperation

173

174

Field operation for reading data from source fields.

175

176

```java { .api }

177

package io.cdap.cdap.etl.api.lineage.field;

178

179

public class FieldReadOperation extends FieldOperation {

180

/**

181

* Create read operation for fields.

182

*/

183

public FieldReadOperation(String name, String description, List<String> fields) {

184

super(name, description, OperationType.READ, Collections.emptyList(), fields);

185

}

186

}

187

```

188

189

### FieldWriteOperation

190

191

Field operation for writing data to destination fields.

192

193

```java { .api }

194

package io.cdap.cdap.etl.api.lineage.field;

195

196

public class FieldWriteOperation extends FieldOperation {

197

/**

198

* Create write operation for fields.

199

*/

200

public FieldWriteOperation(String name, String description, List<String> fields) {

201

super(name, description, OperationType.WRITE, fields, Collections.emptyList());

202

}

203

}

204

```

205

206

### FieldTransformOperation

207

208

Field operation for data transformations between fields.

209

210

```java { .api }

211

package io.cdap.cdap.etl.api.lineage.field;

212

213

public class FieldTransformOperation extends FieldOperation {

214

/**

215

* Create transform operation from inputs to outputs.

216

*/

217

public FieldTransformOperation(String name, String description,

218

List<String> inputs, List<String> outputs) {

219

super(name, description, OperationType.TRANSFORM, inputs, outputs);

220

}

221

}

222

```

223

224

### OperationType

225

226

Enumeration of field operation types.

227

228

```java { .api }

229

package io.cdap.cdap.etl.api.lineage.field;

230

231

public enum OperationType {

232

READ, // Read operation from source

233

WRITE, // Write operation to destination

234

TRANSFORM // Transformation between fields

235

}

236

```

237

238

## Complex Lineage Tracking Examples

239

240

### Data Aggregation Lineage

241

242

```java

243

@Plugin(type = BatchAggregator.PLUGIN_TYPE)

244

@Name("SalesAggregator")

245

public class SalesAggregator extends BatchAggregator<String, StructuredRecord, StructuredRecord> {

246

247

@Override

248

public void aggregate(String groupKey, Iterator<StructuredRecord> groupValues,

249

Emitter<StructuredRecord> emitter) throws Exception {

250

251

// Collect aggregation data

252

double totalSales = 0.0;

253

int orderCount = 0;

254

double maxOrderValue = 0.0;

255

double minOrderValue = Double.MAX_VALUE;

256

Set<String> uniqueCustomers = new HashSet<>();

257

258

while (groupValues.hasNext()) {

259

StructuredRecord record = groupValues.next();

260

Double salesAmount = record.get("sales_amount");

261

String customerId = record.get("customer_id");

262

263

if (salesAmount != null) {

264

totalSales += salesAmount;

265

orderCount++;

266

maxOrderValue = Math.max(maxOrderValue, salesAmount);

267

minOrderValue = Math.min(minOrderValue, salesAmount);

268

}

269

270

if (customerId != null) {

271

uniqueCustomers.add(customerId);

272

}

273

}

274

275

double avgOrderValue = orderCount > 0 ? totalSales / orderCount : 0.0;

276

277

// Build result record

278

StructuredRecord result = StructuredRecord.builder(getContext().getOutputSchema())

279

.set("region", groupKey)

280

.set("total_sales", totalSales)

281

.set("order_count", orderCount)

282

.set("avg_order_value", avgOrderValue)

283

.set("max_order_value", maxOrderValue == 0.0 ? null : maxOrderValue)

284

.set("min_order_value", minOrderValue == Double.MAX_VALUE ? null : minOrderValue)

285

.set("unique_customers", uniqueCustomers.size())

286

.build();

287

288

// Record detailed lineage for aggregation

289

BatchRuntimeContext context = getContext();

290

LineageRecorder lineageRecorder = context.getLineageRecorder();

291

292

List<FieldOperation> operations = Arrays.asList(

293

// Input field reads

294

new FieldReadOperation("read_sales_data",

295

"Read sales transaction data",

296

Arrays.asList("region", "sales_amount", "customer_id")),

297

298

// Aggregation operations

299

new FieldTransformOperation("group_by_region",

300

"Group sales data by region",

301

Arrays.asList("region"),

302

Arrays.asList("region")),

303

new FieldTransformOperation("sum_sales_amount",

304

"Sum sales amounts for region",

305

Arrays.asList("sales_amount"),

306

Arrays.asList("total_sales")),

307

new FieldTransformOperation("count_orders",

308

"Count number of orders",

309

Arrays.asList("sales_amount"),

310

Arrays.asList("order_count")),

311

new FieldTransformOperation("calculate_avg_order",

312

"Calculate average order value",

313

Arrays.asList("sales_amount"),

314

Arrays.asList("avg_order_value")),

315

new FieldTransformOperation("find_max_order",

316

"Find maximum order value",

317

Arrays.asList("sales_amount"),

318

Arrays.asList("max_order_value")),

319

new FieldTransformOperation("find_min_order",

320

"Find minimum order value",

321

Arrays.asList("sales_amount"),

322

Arrays.asList("min_order_value")),

323

new FieldTransformOperation("count_unique_customers",

324

"Count unique customers in region",

325

Arrays.asList("customer_id"),

326

Arrays.asList("unique_customers")),

327

328

// Output write

329

new FieldWriteOperation("write_aggregated_sales",

330

"Write aggregated sales summary",

331

Arrays.asList("region", "total_sales", "order_count",

332

"avg_order_value", "max_order_value",

333

"min_order_value", "unique_customers"))

334

);

335

336

lineageRecorder.record(operations);

337

emitter.emit(result);

338

}

339

}

340

```

341

342

### Join Operation Lineage

343

344

```java

345

@Plugin(type = BatchAutoJoiner.PLUGIN_TYPE)

346

@Name("CustomerOrderJoiner")

347

public class CustomerOrderJoiner extends BatchAutoJoiner {

348

349

@Override

350

public JoinDefinition define(AutoJoinerContext context) {

351

// Join definition code...

352

353

// Record lineage for join operation

354

recordJoinLineage(context);

355

356

return joinDefinition;

357

}

358

359

private void recordJoinLineage(AutoJoinerContext context) {

360

// This would typically be called during runtime, but shown here for illustration

361

List<FieldOperation> operations = Arrays.asList(

362

// Read operations from each input stage

363

new FieldReadOperation("read_customer_data",

364

"Read customer information",

365

Arrays.asList("customers.customer_id", "customers.name",

366

"customers.email", "customers.registration_date")),

367

new FieldReadOperation("read_order_data",

368

"Read order information",

369

Arrays.asList("orders.order_id", "orders.customer_id",

370

"orders.amount", "orders.order_date")),

371

372

// Join key matching

373

new FieldTransformOperation("join_on_customer_id",

374

"Join customers and orders on customer_id",

375

Arrays.asList("customers.customer_id", "orders.customer_id"),

376

Arrays.asList("customer_id")),

377

378

// Field selections and mappings

379

new FieldTransformOperation("select_customer_info",

380

"Select customer information fields",

381

Arrays.asList("customers.customer_id", "customers.name", "customers.email"),

382

Arrays.asList("customer_id", "customer_name", "customer_email")),

383

new FieldTransformOperation("select_order_info",

384

"Select order information fields",

385

Arrays.asList("orders.order_id", "orders.amount", "orders.order_date"),

386

Arrays.asList("order_id", "order_amount", "order_date")),

387

388

// Derived fields

389

new FieldTransformOperation("derive_customer_since",

390

"Map registration date to customer_since",

391

Arrays.asList("customers.registration_date"),

392

Arrays.asList("customer_since")),

393

394

// Output write

395

new FieldWriteOperation("write_joined_data",

396

"Write joined customer and order data",

397

Arrays.asList("customer_id", "customer_name", "customer_email",

398

"customer_since", "order_id", "order_amount", "order_date"))

399

);

400

401

// In actual implementation, this would be recorded during runtime

402

// LineageRecorder would be available in the runtime context

403

}

404

}

405

```

406

407

### Multi-Stage Pipeline Lineage

408

409

```java

410

public class PipelineLineageTracker {

411

412

public static void recordPipelineLineage(PipelineContext pipelineContext,

413

Map<String, List<FieldOperation>> stageOperations) {

414

LineageRecorder recorder = pipelineContext.getLineageRecorder();

415

416

// Combine operations from all stages

417

List<FieldOperation> allOperations = new ArrayList<>();

418

419

// Add stage-specific operations

420

for (Map.Entry<String, List<FieldOperation>> entry : stageOperations.entrySet()) {

421

String stageName = entry.getKey();

422

List<FieldOperation> operations = entry.getValue();

423

424

// Prefix operation names with stage name for clarity

425

List<FieldOperation> prefixedOperations = operations.stream()

426

.map(op -> new FieldOperation(

427

stageName + "." + op.getName(),

428

"[" + stageName + "] " + op.getDescription(),

429

op.getType(),

430

op.getInputs(),

431

op.getOutputs()

432

))

433

.collect(Collectors.toList());

434

435

allOperations.addAll(prefixedOperations);

436

}

437

438

// Add pipeline-level operations

439

allOperations.add(new FieldReadOperation("pipeline.source_read",

440

"Pipeline source data read",

441

Arrays.asList("raw_data.*")));

442

allOperations.add(new FieldWriteOperation("pipeline.sink_write",

443

"Pipeline final data write",

444

Arrays.asList("processed_data.*")));

445

446

// Record complete lineage

447

recorder.record(allOperations);

448

}

449

}

450

```

451

452

## Lineage Utilities and Best Practices

453

454

### Lineage Operation Builder

455

456

```java

457

public class LineageOperationBuilder {

458

459

public static class ReadOperationBuilder {

460

private String name;

461

private String description;

462

private List<String> fields = new ArrayList<>();

463

464

public ReadOperationBuilder name(String name) {

465

this.name = name;

466

return this;

467

}

468

469

public ReadOperationBuilder description(String description) {

470

this.description = description;

471

return this;

472

}

473

474

public ReadOperationBuilder fields(String... fields) {

475

this.fields.addAll(Arrays.asList(fields));

476

return this;

477

}

478

479

public FieldReadOperation build() {

480

return new FieldReadOperation(name, description, fields);

481

}

482

}

483

484

public static class TransformOperationBuilder {

485

private String name;

486

private String description;

487

private List<String> inputs = new ArrayList<>();

488

private List<String> outputs = new ArrayList<>();

489

490

public TransformOperationBuilder name(String name) {

491

this.name = name;

492

return this;

493

}

494

495

public TransformOperationBuilder description(String description) {

496

this.description = description;

497

return this;

498

}

499

500

public TransformOperationBuilder inputs(String... inputs) {

501

this.inputs.addAll(Arrays.asList(inputs));

502

return this;

503

}

504

505

public TransformOperationBuilder outputs(String... outputs) {

506

this.outputs.addAll(Arrays.asList(outputs));

507

return this;

508

}

509

510

public FieldTransformOperation build() {

511

return new FieldTransformOperation(name, description, inputs, outputs);

512

}

513

}

514

515

public static ReadOperationBuilder read() {

516

return new ReadOperationBuilder();

517

}

518

519

public static TransformOperationBuilder transform() {

520

return new TransformOperationBuilder();

521

}

522

523

public static FieldWriteOperation write(String name, String description, String... fields) {

524

return new FieldWriteOperation(name, description, Arrays.asList(fields));

525

}

526

}

527

528

// Usage example:

529

List<FieldOperation> operations = Arrays.asList(

530

LineageOperationBuilder.read()

531

.name("read_source")

532

.description("Read source customer data")

533

.fields("customer_id", "first_name", "last_name", "email")

534

.build(),

535

536

LineageOperationBuilder.transform()

537

.name("standardize_names")

538

.description("Standardize name fields")

539

.inputs("first_name", "last_name")

540

.outputs("std_first_name", "std_last_name")

541

.build(),

542

543

LineageOperationBuilder.write("write_output",

544

"Write standardized customer data",

545

"customer_id", "std_first_name", "std_last_name", "email")

546

);

547

```

548

549

### Lineage Analysis Tools

550

551

```java

552

public class LineageAnalyzer {

553

554

public static Map<String, Set<String>> buildFieldDependencyGraph(List<FieldOperation> operations) {

555

Map<String, Set<String>> dependencies = new HashMap<>();

556

557

for (FieldOperation operation : operations) {

558

if (operation.getType() == OperationType.TRANSFORM) {

559

for (String output : operation.getOutputs()) {

560

dependencies.computeIfAbsent(output, k -> new HashSet<>())

561

.addAll(operation.getInputs());

562

}

563

}

564

}

565

566

return dependencies;

567

}

568

569

public static List<String> getFieldLineage(String fieldName,

570

Map<String, Set<String>> dependencyGraph) {

571

List<String> lineage = new ArrayList<>();

572

Set<String> visited = new HashSet<>();

573

574

buildLineage(fieldName, dependencyGraph, lineage, visited);

575

Collections.reverse(lineage); // Reverse to show source-to-target order

576

577

return lineage;

578

}

579

580

private static void buildLineage(String fieldName,

581

Map<String, Set<String>> dependencyGraph,

582

List<String> lineage,

583

Set<String> visited) {

584

if (visited.contains(fieldName)) {

585

return; // Avoid cycles

586

}

587

588

visited.add(fieldName);

589

lineage.add(fieldName);

590

591

Set<String> dependencies = dependencyGraph.get(fieldName);

592

if (dependencies != null) {

593

for (String dependency : dependencies) {

594

buildLineage(dependency, dependencyGraph, lineage, visited);

595

}

596

}

597

}

598

599

public static Set<String> findImpactedFields(String sourceField,

600

Map<String, Set<String>> dependencyGraph) {

601

Set<String> impacted = new HashSet<>();

602

603

for (Map.Entry<String, Set<String>> entry : dependencyGraph.entrySet()) {

604

if (entry.getValue().contains(sourceField)) {

605

impacted.add(entry.getKey());

606

// Recursively find fields impacted by this field

607

impacted.addAll(findImpactedFields(entry.getKey(), dependencyGraph));

608

}

609

}

610

611

return impacted;

612

}

613

}

614

```

615

616

## Metadata and Governance Integration

617

618

### Metadata Enrichment

619

620

```java

621

public class MetadataEnrichedLineage {

622

623

public static class EnrichedFieldOperation extends FieldOperation {

624

private final Map<String, Object> metadata;

625

626

public EnrichedFieldOperation(FieldOperation baseOperation,

627

Map<String, Object> metadata) {

628

super(baseOperation.getName(), baseOperation.getDescription(),

629

baseOperation.getType(), baseOperation.getInputs(),

630

baseOperation.getOutputs());

631

this.metadata = metadata;

632

}

633

634

public Map<String, Object> getMetadata() {

635

return metadata;

636

}

637

638

public Object getMetadata(String key) {

639

return metadata.get(key);

640

}

641

}

642

643

public static EnrichedFieldOperation enrichWithMetadata(FieldOperation operation,

644

Schema inputSchema,

645

Schema outputSchema) {

646

Map<String, Object> metadata = new HashMap<>();

647

648

// Add schema information

649

metadata.put("inputSchema", inputSchema != null ? inputSchema.toString() : null);

650

metadata.put("outputSchema", outputSchema != null ? outputSchema.toString() : null);

651

652

// Add field type information

653

if (inputSchema != null) {

654

Map<String, String> inputTypes = new HashMap<>();

655

for (String fieldName : operation.getInputs()) {

656

Schema.Field field = inputSchema.getField(fieldName);

657

if (field != null) {

658

inputTypes.put(fieldName, field.getSchema().getType().toString());

659

}

660

}

661

metadata.put("inputFieldTypes", inputTypes);

662

}

663

664

if (outputSchema != null) {

665

Map<String, String> outputTypes = new HashMap<>();

666

for (String fieldName : operation.getOutputs()) {

667

Schema.Field field = outputSchema.getField(fieldName);

668

if (field != null) {

669

outputTypes.put(fieldName, field.getSchema().getType().toString());

670

}

671

}

672

metadata.put("outputFieldTypes", outputTypes);

673

}

674

675

// Add timestamp

676

metadata.put("recordedAt", Instant.now().toString());

677

678

// Add operation complexity

679

metadata.put("complexity", calculateComplexity(operation));

680

681

return new EnrichedFieldOperation(operation, metadata);

682

}

683

684

private static String calculateComplexity(FieldOperation operation) {

685

int inputCount = operation.getInputs().size();

686

int outputCount = operation.getOutputs().size();

687

688

if (inputCount == 1 && outputCount == 1) {

689

return "SIMPLE";

690

} else if (inputCount > 1 && outputCount == 1) {

691

return "AGGREGATION";

692

} else if (inputCount == 1 && outputCount > 1) {

693

return "EXPANSION";

694

} else {

695

return "COMPLEX";

696

}

697

}

698

}

699

```