or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

auth.mdbom.mdconfig.mdfilestore.mdindex.mdinference.mdmetadata.mdpolicy.md

metadata.mddocs/

0

# Metadata Management

1

2

Comprehensive metadata management system for tracking data lineage, ML workflows, and audit trails with support for multiple storage backends. The metadata framework provides standardized interfaces for capturing, storing, and querying metadata across distributed data processing pipelines with Kafka streaming and logging-based implementations.

3

4

## Capabilities

5

6

### Metadata API Interface

7

8

Abstract base interface defining standardized metadata operations for creating and retrieving metadata across different storage backends and implementation strategies.

9

10

```python { .api }

11

class MetadataAPI(ABC):

12

"""

13

API for a metadata service.

14

"""

15

16

@abstractmethod

17

def create_metadata(self, metadata: MetadataModel) -> None:

18

"""

19

Method to create metadata.

20

21

Parameters:

22

- metadata: MetadataModel - Metadata object to create

23

"""

24

...

25

26

@abstractmethod

27

def get_metadata(self, search_params: Dict[str, any]) -> List[MetadataModel]:

28

"""

29

Method to get metadata from search criteria.

30

31

Parameters:

32

- search_params: Dict[str, any] - Search parameters for metadata query

33

34

Returns:

35

List[MetadataModel] - List of matching metadata objects

36

"""

37

...

38

```

39

40

### Metadata Data Model

41

42

Standardized data model for capturing comprehensive metadata information including resource identifiers, subjects, actions, timestamps, and extensible key-value pairs for additional context.

43

44

```python { .api }

45

class MetadataModel(BaseModel):

46

"""

47

Class that represents a common metadata model.

48

49

Attributes:

50

- resource: str - The identifier of the data (default: random UUID)

51

- subject: str - The thing acting on the data (default: empty string)

52

- action: str - The action being taken (default: empty string)

53

- timestamp: float - Time when action occurred as Unix timestamp (default: current timestamp)

54

- additionalValues: Dict[str, str] - Additional key-value pairs (default: empty dict)

55

"""

56

resource: str = uuid4().hex

57

subject: str = ""

58

action: str = ""

59

timestamp: float = datetime.now().timestamp()

60

additionalValues: Dict[str, str] = dict()

61

```

62

63

### Hive Metadata Service

64

65

Production-ready metadata service implementation using Kafka for high-throughput metadata streaming with MessagingConfig integration for distributed data processing workflows.

66

67

```python { .api }

68

class HiveMetadataAPIService(MetadataAPI):

69

"""

70

Class to handle basic logging of metadata.

71

72

Class Attributes:

73

- logger - LogManager instance for HiveMetadataAPIService

74

"""

75

76

def __init__(self) -> None:

77

"""Initialize with MessagingConfig and KafkaProducer"""

78

...

79

80

def create_metadata(self, metadata: MetadataModel) -> None:

81

"""

82

Creates metadata by sending to Kafka.

83

84

Parameters:

85

- metadata: MetadataModel - Metadata to send to Kafka topic

86

"""

87

...

88

89

def get_metadata(self, search_params: Dict[str, any]) -> List[MetadataModel]:

90

"""

91

Returns empty list (not implemented).

92

93

Parameters:

94

- search_params: Dict[str, any] - Search parameters (unused)

95

96

Returns:

97

List[MetadataModel] - Empty list (retrieval not implemented)

98

"""

99

...

100

```

101

102

### Logging Metadata Service

103

104

Development and testing metadata service implementation using logging output for metadata tracking, designed for non-production environments and debugging workflows.

105

106

```python { .api }

107

class LoggingMetadataAPIService(MetadataAPI):

108

"""

109

Class to handle basic logging of metadata.

110

Intended for testing purposes and not suited for production.

111

112

Class Attributes:

113

- logger - LogManager instance for LoggingMetadataAPIService

114

"""

115

116

def create_metadata(self, metadata: MetadataModel) -> None:

117

"""

118

Logs metadata information.

119

120

Parameters:

121

- metadata: MetadataModel - Metadata to log

122

"""

123

...

124

125

def get_metadata(self, search_params: Dict[str, any]) -> List[MetadataModel]:

126

"""

127

Returns empty list (not implemented).

128

129

Parameters:

130

- search_params: Dict[str, any] - Search parameters (unused)

131

132

Returns:

133

List[MetadataModel] - Empty list (retrieval not implemented)

134

"""

135

...

136

```

137

138

## Usage Examples

139

140

### Basic Metadata Tracking

141

142

```python

143

from aissemble_core_metadata.metadata_model import MetadataModel

144

from aissemble_core_metadata.hive_metadata_api_service import HiveMetadataAPIService

145

from datetime import datetime

146

147

# Create metadata for data processing event

148

metadata = MetadataModel(

149

resource="customer-transactions-2024-09",

150

subject="etl-pipeline-001",

151

action="DATA_PROCESSED",

152

timestamp=datetime.now().timestamp(),

153

additionalValues={

154

"records_processed": "1500000",

155

"processing_duration": "45_minutes",

156

"data_quality_score": "0.95"

157

}

158

)

159

160

# Initialize Hive metadata service (uses Kafka)

161

metadata_service = HiveMetadataAPIService()

162

163

# Send metadata to Kafka topic

164

metadata_service.create_metadata(metadata)

165

print(f"Metadata sent for resource: {metadata.resource}")

166

```

167

168

### ML Training Metadata Tracking

169

170

```python

171

from aissemble_core_metadata.metadata_model import MetadataModel

172

from aissemble_core_metadata.hive_metadata_api_service import HiveMetadataAPIService

173

from datetime import datetime

174

import uuid

175

176

class MLTrainingMetadataTracker:

177

"""Track comprehensive metadata for ML training workflows"""

178

179

def __init__(self):

180

self.metadata_service = HiveMetadataAPIService()

181

self.training_session_id = str(uuid.uuid4())

182

183

def track_training_start(self, model_name: str, dataset_path: str):

184

"""Track training session initiation"""

185

metadata = MetadataModel(

186

resource=self.training_session_id,

187

subject="ml-training-pipeline",

188

action="TRAINING_STARTED",

189

additionalValues={

190

"model_name": model_name,

191

"dataset_path": dataset_path,

192

"training_environment": "production",

193

"framework": "scikit-learn"

194

}

195

)

196

self.metadata_service.create_metadata(metadata)

197

print(f"Training started for {model_name}")

198

199

def track_data_loading(self, records_count: int, features_count: int):

200

"""Track data loading phase"""

201

metadata = MetadataModel(

202

resource=self.training_session_id,

203

subject="data-loader",

204

action="DATA_LOADED",

205

additionalValues={

206

"records_count": str(records_count),

207

"features_count": str(features_count),

208

"data_validation": "passed",

209

"missing_values_handled": "true"

210

}

211

)

212

self.metadata_service.create_metadata(metadata)

213

214

def track_feature_engineering(self, original_features: list, selected_features: list):

215

"""Track feature engineering process"""

216

metadata = MetadataModel(

217

resource=self.training_session_id,

218

subject="feature-engineer",

219

action="FEATURES_ENGINEERED",

220

additionalValues={

221

"original_feature_count": str(len(original_features)),

222

"selected_feature_count": str(len(selected_features)),

223

"feature_selection_method": "recursive_feature_elimination",

224

"dimensionality_reduction": "applied"

225

}

226

)

227

self.metadata_service.create_metadata(metadata)

228

229

def track_model_training(self, algorithm: str, hyperparameters: dict):

230

"""Track model training process"""

231

metadata = MetadataModel(

232

resource=self.training_session_id,

233

subject="model-trainer",

234

action="MODEL_TRAINED",

235

additionalValues={

236

"algorithm": algorithm,

237

**{f"param_{k}": str(v) for k, v in hyperparameters.items()},

238

"cross_validation_folds": "5",

239

"training_time": "25_minutes"

240

}

241

)

242

self.metadata_service.create_metadata(metadata)

243

244

def track_model_evaluation(self, metrics: dict):

245

"""Track model evaluation results"""

246

metadata = MetadataModel(

247

resource=self.training_session_id,

248

subject="model-evaluator",

249

action="MODEL_EVALUATED",

250

additionalValues={

251

**{f"metric_{k}": str(v) for k, v in metrics.items()},

252

"evaluation_dataset": "holdout_test_set",

253

"evaluation_method": "stratified_split"

254

}

255

)

256

self.metadata_service.create_metadata(metadata)

257

258

def track_training_completion(self, model_path: str, status: str):

259

"""Track training completion"""

260

metadata = MetadataModel(

261

resource=self.training_session_id,

262

subject="ml-training-pipeline",

263

action="TRAINING_COMPLETED",

264

additionalValues={

265

"final_status": status,

266

"model_artifact_path": model_path,

267

"total_training_time": "2_hours_15_minutes",

268

"model_size_mb": "12.5"

269

}

270

)

271

self.metadata_service.create_metadata(metadata)

272

print(f"Training completed with status: {status}")

273

274

# Usage example

275

tracker = MLTrainingMetadataTracker()

276

277

# Track complete ML training workflow

278

tracker.track_training_start("fraud_detection_v2", "s3://data/fraud_training.parquet")

279

tracker.track_data_loading(records_count=1000000, features_count=47)

280

tracker.track_feature_engineering(

281

original_features=["age", "income", "transaction_amount", "merchant", "location"],

282

selected_features=["age", "income", "transaction_amount"]

283

)

284

tracker.track_model_training(

285

algorithm="RandomForestClassifier",

286

hyperparameters={"n_estimators": 100, "max_depth": 10, "min_samples_split": 2}

287

)

288

tracker.track_model_evaluation({

289

"accuracy": 0.94,

290

"precision": 0.91,

291

"recall": 0.96,

292

"f1_score": 0.93

293

})

294

tracker.track_training_completion("s3://models/fraud_detection_v2.pkl", "SUCCESS")

295

```

296

297

### Data Pipeline Lineage Tracking

298

299

```python

300

from aissemble_core_metadata.metadata_model import MetadataModel

301

from aissemble_core_metadata.hive_metadata_api_service import HiveMetadataAPIService

302

from datetime import datetime

303

import json

304

305

class DataLineageTracker:

306

"""Track data lineage across processing pipeline stages"""

307

308

def __init__(self, pipeline_id: str):

309

self.pipeline_id = pipeline_id

310

self.metadata_service = HiveMetadataAPIService()

311

self.stage_counter = 0

312

313

def track_data_ingestion(self, source_path: str, records_ingested: int):

314

"""Track data ingestion from external source"""

315

self.stage_counter += 1

316

317

metadata = MetadataModel(

318

resource=f"{self.pipeline_id}_stage_{self.stage_counter}",

319

subject="data-ingestion",

320

action="DATA_INGESTED",

321

additionalValues={

322

"pipeline_id": self.pipeline_id,

323

"stage_number": str(self.stage_counter),

324

"source_path": source_path,

325

"records_ingested": str(records_ingested),

326

"ingestion_method": "spark_batch",

327

"data_format": "parquet"

328

}

329

)

330

self.metadata_service.create_metadata(metadata)

331

332

def track_data_transformation(self, transformation_type: str, input_records: int, output_records: int):

333

"""Track data transformation operations"""

334

self.stage_counter += 1

335

336

metadata = MetadataModel(

337

resource=f"{self.pipeline_id}_stage_{self.stage_counter}",

338

subject="data-transformer",

339

action="DATA_TRANSFORMED",

340

additionalValues={

341

"pipeline_id": self.pipeline_id,

342

"stage_number": str(self.stage_counter),

343

"transformation_type": transformation_type,

344

"input_records": str(input_records),

345

"output_records": str(output_records),

346

"transformation_time": str(datetime.now().timestamp())

347

}

348

)

349

self.metadata_service.create_metadata(metadata)

350

351

def track_data_quality_check(self, quality_rules: dict, quality_score: float):

352

"""Track data quality validation"""

353

self.stage_counter += 1

354

355

metadata = MetadataModel(

356

resource=f"{self.pipeline_id}_stage_{self.stage_counter}",

357

subject="quality-checker",

358

action="QUALITY_VALIDATED",

359

additionalValues={

360

"pipeline_id": self.pipeline_id,

361

"stage_number": str(self.stage_counter),

362

"quality_score": str(quality_score),

363

"quality_rules": json.dumps(quality_rules),

364

"validation_passed": str(quality_score >= 0.8)

365

}

366

)

367

self.metadata_service.create_metadata(metadata)

368

369

def track_data_output(self, output_path: str, records_written: int):

370

"""Track data output to destination"""

371

self.stage_counter += 1

372

373

metadata = MetadataModel(

374

resource=f"{self.pipeline_id}_stage_{self.stage_counter}",

375

subject="data-writer",

376

action="DATA_WRITTEN",

377

additionalValues={

378

"pipeline_id": self.pipeline_id,

379

"stage_number": str(self.stage_counter),

380

"output_path": output_path,

381

"records_written": str(records_written),

382

"write_format": "delta",

383

"partition_strategy": "date_based"

384

}

385

)

386

self.metadata_service.create_metadata(metadata)

387

388

def track_pipeline_completion(self, status: str, total_runtime: str):

389

"""Track overall pipeline completion"""

390

metadata = MetadataModel(

391

resource=self.pipeline_id,

392

subject="pipeline-orchestrator",

393

action="PIPELINE_COMPLETED",

394

additionalValues={

395

"final_status": status,

396

"total_stages": str(self.stage_counter),

397

"total_runtime": total_runtime,

398

"completion_time": str(datetime.now().timestamp())

399

}

400

)

401

self.metadata_service.create_metadata(metadata)

402

403

# Usage example

404

lineage_tracker = DataLineageTracker("customer_analytics_pipeline_20240905")

405

406

# Track complete data pipeline

407

lineage_tracker.track_data_ingestion("s3://raw-data/customers.parquet", 2500000)

408

lineage_tracker.track_data_transformation("deduplication", 2500000, 2450000)

409

lineage_tracker.track_data_transformation("enrichment", 2450000, 2450000)

410

lineage_tracker.track_data_quality_check(

411

quality_rules={"completeness": 0.95, "validity": 0.98, "consistency": 0.92},

412

quality_score=0.95

413

)

414

lineage_tracker.track_data_output("s3://processed-data/customer_analytics.delta", 2450000)

415

lineage_tracker.track_pipeline_completion("SUCCESS", "1_hour_23_minutes")

416

```

417

418

### Development vs Production Metadata Services

419

420

```python

421

from aissemble_core_metadata.metadata_model import MetadataModel

422

from aissemble_core_metadata.hive_metadata_api_service import HiveMetadataAPIService

423

from aissemble_core_metadata.logging_metadata_api_service import LoggingMetadataAPIService

424

import os

425

426

class MetadataServiceFactory:

427

"""Factory for creating appropriate metadata service based on environment"""

428

429

@staticmethod

430

def create_metadata_service():

431

"""Create metadata service based on environment"""

432

environment = os.getenv("ENVIRONMENT", "development")

433

434

if environment == "production":

435

print("Using Hive metadata service (Kafka)")

436

return HiveMetadataAPIService()

437

else:

438

print("Using logging metadata service (Console)")

439

return LoggingMetadataAPIService()

440

441

class UniversalMetadataTracker:

442

"""Metadata tracker that works across development and production"""

443

444

def __init__(self):

445

self.metadata_service = MetadataServiceFactory.create_metadata_service()

446

447

def track_event(self, resource: str, subject: str, action: str, **additional_values):

448

"""Track any type of event with flexible additional values"""

449

metadata = MetadataModel(

450

resource=resource,

451

subject=subject,

452

action=action,

453

additionalValues={str(k): str(v) for k, v in additional_values.items()}

454

)

455

456

self.metadata_service.create_metadata(metadata)

457

return metadata

458

459

def track_batch_events(self, events: list):

460

"""Track multiple events in sequence"""

461

tracked_events = []

462

463

for event in events:

464

metadata = self.track_event(**event)

465

tracked_events.append(metadata)

466

467

return tracked_events

468

469

# Usage example

470

tracker = UniversalMetadataTracker()

471

472

# Single event tracking

473

tracker.track_event(

474

resource="user_behavior_analysis",

475

subject="analytics_engine",

476

action="ANALYSIS_COMPLETED",

477

user_count=150000,

478

analysis_type="behavioral_segmentation",

479

execution_time="45_minutes"

480

)

481

482

# Batch event tracking

483

batch_events = [

484

{

485

"resource": "daily_report_001",

486

"subject": "report_generator",

487

"action": "REPORT_STARTED",

488

"report_type": "sales_summary"

489

},

490

{

491

"resource": "daily_report_001",

492

"subject": "data_aggregator",

493

"action": "DATA_AGGREGATED",

494

"records_processed": 500000

495

},

496

{

497

"resource": "daily_report_001",

498

"subject": "report_generator",

499

"action": "REPORT_COMPLETED",

500

"output_path": "s3://reports/daily_sales_20240905.pdf"

501

}

502

]

503

504

tracked = tracker.track_batch_events(batch_events)

505

print(f"Tracked {len(tracked)} events")

506

```

507

508

### Custom Metadata Extensions

509

510

```python

511

from aissemble_core_metadata.metadata_model import MetadataModel

512

from aissemble_core_metadata.hive_metadata_api_service import HiveMetadataAPIService

513

from datetime import datetime

514

import json

515

516

class EnhancedMetadataModel(MetadataModel):

517

"""Extended metadata model with domain-specific fields"""

518

519

def __init__(self, **kwargs):

520

super().__init__(**kwargs)

521

522

def add_ml_context(self, model_version: str, experiment_id: str, dataset_version: str):

523

"""Add ML-specific context"""

524

self.additionalValues.update({

525

"ml_model_version": model_version,

526

"ml_experiment_id": experiment_id,

527

"ml_dataset_version": dataset_version,

528

"ml_context_added": str(datetime.now().timestamp())

529

})

530

return self

531

532

def add_data_context(self, schema_version: str, partition_info: dict, quality_metrics: dict):

533

"""Add data processing context"""

534

self.additionalValues.update({

535

"data_schema_version": schema_version,

536

"data_partition_info": json.dumps(partition_info),

537

"data_quality_metrics": json.dumps(quality_metrics),

538

"data_context_added": str(datetime.now().timestamp())

539

})

540

return self

541

542

def add_infrastructure_context(self, cluster_id: str, node_count: int, resource_usage: dict):

543

"""Add infrastructure context"""

544

self.additionalValues.update({

545

"infra_cluster_id": cluster_id,

546

"infra_node_count": str(node_count),

547

"infra_resource_usage": json.dumps(resource_usage),

548

"infra_context_added": str(datetime.now().timestamp())

549

})

550

return self

551

552

class DomainSpecificMetadataService:

553

"""Service for domain-specific metadata operations"""

554

555

def __init__(self):

556

self.base_service = HiveMetadataAPIService()

557

558

def track_ml_inference(self, model_id: str, request_count: int, latency_ms: float):

559

"""Track ML inference events"""

560

metadata = EnhancedMetadataModel(

561

resource=f"inference_{model_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}",

562

subject="ml_inference_service",

563

action="INFERENCE_EXECUTED"

564

).add_ml_context(

565

model_version="v2.1.0",

566

experiment_id="exp_001",

567

dataset_version="v1.5"

568

).add_infrastructure_context(

569

cluster_id="ml-cluster-prod",

570

node_count=5,

571

resource_usage={"cpu_percent": 75, "memory_gb": 32}

572

)

573

574

metadata.additionalValues.update({

575

"request_count": str(request_count),

576

"average_latency_ms": str(latency_ms),

577

"throughput_rps": str(request_count / (latency_ms / 1000))

578

})

579

580

self.base_service.create_metadata(metadata)

581

return metadata

582

583

def track_data_pipeline_stage(self, pipeline_id: str, stage_name: str,

584

input_size: int, output_size: int):

585

"""Track data pipeline stage execution"""

586

metadata = EnhancedMetadataModel(

587

resource=f"{pipeline_id}_{stage_name}",

588

subject="data_pipeline",

589

action="STAGE_EXECUTED"

590

).add_data_context(

591

schema_version="v1.0",

592

partition_info={"partition_by": "date", "partition_count": 30},

593

quality_metrics={"completeness": 0.98, "accuracy": 0.95}

594

)

595

596

metadata.additionalValues.update({

597

"stage_name": stage_name,

598

"input_record_count": str(input_size),

599

"output_record_count": str(output_size),

600

"data_reduction_ratio": str(output_size / input_size if input_size > 0 else 0)

601

})

602

603

self.base_service.create_metadata(metadata)

604

return metadata

605

606

# Usage example

607

domain_service = DomainSpecificMetadataService()

608

609

# Track ML inference

610

inference_metadata = domain_service.track_ml_inference(

611

model_id="fraud_detection_model",

612

request_count=1000,

613

latency_ms=45.0

614

)

615

616

# Track data pipeline stage

617

pipeline_metadata = domain_service.track_data_pipeline_stage(

618

pipeline_id="customer_segmentation",

619

stage_name="feature_engineering",

620

input_size=1000000,

621

output_size=950000

622

)

623

624

print(f"Tracked inference: {inference_metadata.resource}")

625

print(f"Tracked pipeline stage: {pipeline_metadata.resource}")

626

```

627

628

## Best Practices

629

630

### Metadata Design

631

- Use consistent resource naming conventions

632

- Include comprehensive context in additionalValues

633

- Track both successful and failed operations

634

- Implement structured logging for debugging

635

636

### Service Selection

637

- Use LoggingMetadataAPIService for development and testing

638

- Use HiveMetadataAPIService for production environments

639

- Consider custom implementations for specific requirements

640

- Plan for metadata service migration strategies

641

642

### Performance Considerations

643

- Batch metadata operations when possible

644

- Use asynchronous patterns for high-throughput scenarios

645

- Monitor Kafka topic health and throughput

646

- Implement metadata retention policies

647

648

### Governance and Compliance

649

- Establish metadata standards across teams

650

- Regular metadata quality audits

651

- Implement data lineage tracking

652

- Maintain metadata schema versioning