0
# Metadata Management
1
2
Comprehensive metadata management system for tracking data lineage, ML workflows, and audit trails with support for multiple storage backends. The metadata framework provides standardized interfaces for capturing, storing, and querying metadata across distributed data processing pipelines with Kafka streaming and logging-based implementations.
3
4
## Capabilities
5
6
### Metadata API Interface
7
8
Abstract base interface defining standardized metadata operations for creating and retrieving metadata across different storage backends and implementation strategies.
9
10
```python { .api }
11
class MetadataAPI(ABC):
12
"""
13
API for a metadata service.
14
"""
15
16
@abstractmethod
17
def create_metadata(self, metadata: MetadataModel) -> None:
18
"""
19
Method to create metadata.
20
21
Parameters:
22
- metadata: MetadataModel - Metadata object to create
23
"""
24
...
25
26
@abstractmethod
27
def get_metadata(self, search_params: Dict[str, any]) -> List[MetadataModel]:
28
"""
29
Method to get metadata from search criteria.
30
31
Parameters:
32
- search_params: Dict[str, any] - Search parameters for metadata query
33
34
Returns:
35
List[MetadataModel] - List of matching metadata objects
36
"""
37
...
38
```
39
40
### Metadata Data Model
41
42
Standardized data model for capturing comprehensive metadata information including resource identifiers, subjects, actions, timestamps, and extensible key-value pairs for additional context.
43
44
```python { .api }
45
class MetadataModel(BaseModel):
46
"""
47
Class that represents a common metadata model.
48
49
Attributes:
50
- resource: str - The identifier of the data (default: random UUID)
51
- subject: str - The thing acting on the data (default: empty string)
52
- action: str - The action being taken (default: empty string)
53
- timestamp: float - Time when action occurred as Unix timestamp (default: current timestamp)
54
- additionalValues: Dict[str, str] - Additional key-value pairs (default: empty dict)
55
"""
56
resource: str = uuid4().hex
57
subject: str = ""
58
action: str = ""
59
timestamp: float = datetime.now().timestamp()
60
additionalValues: Dict[str, str] = dict()
61
```
62
63
### Hive Metadata Service
64
65
Production-ready metadata service implementation using Kafka for high-throughput metadata streaming with MessagingConfig integration for distributed data processing workflows.
66
67
```python { .api }
68
class HiveMetadataAPIService(MetadataAPI):
69
"""
70
Class to handle basic logging of metadata.
71
72
Class Attributes:
73
- logger - LogManager instance for HiveMetadataAPIService
74
"""
75
76
def __init__(self) -> None:
77
"""Initialize with MessagingConfig and KafkaProducer"""
78
...
79
80
def create_metadata(self, metadata: MetadataModel) -> None:
81
"""
82
Creates metadata by sending to Kafka.
83
84
Parameters:
85
- metadata: MetadataModel - Metadata to send to Kafka topic
86
"""
87
...
88
89
def get_metadata(self, search_params: Dict[str, any]) -> List[MetadataModel]:
90
"""
91
Returns empty list (not implemented).
92
93
Parameters:
94
- search_params: Dict[str, any] - Search parameters (unused)
95
96
Returns:
97
List[MetadataModel] - Empty list (retrieval not implemented)
98
"""
99
...
100
```
101
102
### Logging Metadata Service
103
104
Development and testing metadata service implementation using logging output for metadata tracking, designed for non-production environments and debugging workflows.
105
106
```python { .api }
107
class LoggingMetadataAPIService(MetadataAPI):
108
"""
109
Class to handle basic logging of metadata.
110
Intended for testing purposes and not suited for production.
111
112
Class Attributes:
113
- logger - LogManager instance for LoggingMetadataAPIService
114
"""
115
116
def create_metadata(self, metadata: MetadataModel) -> None:
117
"""
118
Logs metadata information.
119
120
Parameters:
121
- metadata: MetadataModel - Metadata to log
122
"""
123
...
124
125
def get_metadata(self, search_params: Dict[str, any]) -> List[MetadataModel]:
126
"""
127
Returns empty list (not implemented).
128
129
Parameters:
130
- search_params: Dict[str, any] - Search parameters (unused)
131
132
Returns:
133
List[MetadataModel] - Empty list (retrieval not implemented)
134
"""
135
...
136
```
137
138
## Usage Examples
139
140
### Basic Metadata Tracking
141
142
```python
143
from aissemble_core_metadata.metadata_model import MetadataModel
144
from aissemble_core_metadata.hive_metadata_api_service import HiveMetadataAPIService
145
from datetime import datetime
146
147
# Create metadata for data processing event
148
metadata = MetadataModel(
149
resource="customer-transactions-2024-09",
150
subject="etl-pipeline-001",
151
action="DATA_PROCESSED",
152
timestamp=datetime.now().timestamp(),
153
additionalValues={
154
"records_processed": "1500000",
155
"processing_duration": "45_minutes",
156
"data_quality_score": "0.95"
157
}
158
)
159
160
# Initialize Hive metadata service (uses Kafka)
161
metadata_service = HiveMetadataAPIService()
162
163
# Send metadata to Kafka topic
164
metadata_service.create_metadata(metadata)
165
print(f"Metadata sent for resource: {metadata.resource}")
166
```
167
168
### ML Training Metadata Tracking
169
170
```python
171
from aissemble_core_metadata.metadata_model import MetadataModel
172
from aissemble_core_metadata.hive_metadata_api_service import HiveMetadataAPIService
173
from datetime import datetime
174
import uuid
175
176
class MLTrainingMetadataTracker:
177
"""Track comprehensive metadata for ML training workflows"""
178
179
def __init__(self):
180
self.metadata_service = HiveMetadataAPIService()
181
self.training_session_id = str(uuid.uuid4())
182
183
def track_training_start(self, model_name: str, dataset_path: str):
184
"""Track training session initiation"""
185
metadata = MetadataModel(
186
resource=self.training_session_id,
187
subject="ml-training-pipeline",
188
action="TRAINING_STARTED",
189
additionalValues={
190
"model_name": model_name,
191
"dataset_path": dataset_path,
192
"training_environment": "production",
193
"framework": "scikit-learn"
194
}
195
)
196
self.metadata_service.create_metadata(metadata)
197
print(f"Training started for {model_name}")
198
199
def track_data_loading(self, records_count: int, features_count: int):
200
"""Track data loading phase"""
201
metadata = MetadataModel(
202
resource=self.training_session_id,
203
subject="data-loader",
204
action="DATA_LOADED",
205
additionalValues={
206
"records_count": str(records_count),
207
"features_count": str(features_count),
208
"data_validation": "passed",
209
"missing_values_handled": "true"
210
}
211
)
212
self.metadata_service.create_metadata(metadata)
213
214
def track_feature_engineering(self, original_features: list, selected_features: list):
215
"""Track feature engineering process"""
216
metadata = MetadataModel(
217
resource=self.training_session_id,
218
subject="feature-engineer",
219
action="FEATURES_ENGINEERED",
220
additionalValues={
221
"original_feature_count": str(len(original_features)),
222
"selected_feature_count": str(len(selected_features)),
223
"feature_selection_method": "recursive_feature_elimination",
224
"dimensionality_reduction": "applied"
225
}
226
)
227
self.metadata_service.create_metadata(metadata)
228
229
def track_model_training(self, algorithm: str, hyperparameters: dict):
230
"""Track model training process"""
231
metadata = MetadataModel(
232
resource=self.training_session_id,
233
subject="model-trainer",
234
action="MODEL_TRAINED",
235
additionalValues={
236
"algorithm": algorithm,
237
**{f"param_{k}": str(v) for k, v in hyperparameters.items()},
238
"cross_validation_folds": "5",
239
"training_time": "25_minutes"
240
}
241
)
242
self.metadata_service.create_metadata(metadata)
243
244
def track_model_evaluation(self, metrics: dict):
245
"""Track model evaluation results"""
246
metadata = MetadataModel(
247
resource=self.training_session_id,
248
subject="model-evaluator",
249
action="MODEL_EVALUATED",
250
additionalValues={
251
**{f"metric_{k}": str(v) for k, v in metrics.items()},
252
"evaluation_dataset": "holdout_test_set",
253
"evaluation_method": "stratified_split"
254
}
255
)
256
self.metadata_service.create_metadata(metadata)
257
258
def track_training_completion(self, model_path: str, status: str):
259
"""Track training completion"""
260
metadata = MetadataModel(
261
resource=self.training_session_id,
262
subject="ml-training-pipeline",
263
action="TRAINING_COMPLETED",
264
additionalValues={
265
"final_status": status,
266
"model_artifact_path": model_path,
267
"total_training_time": "2_hours_15_minutes",
268
"model_size_mb": "12.5"
269
}
270
)
271
self.metadata_service.create_metadata(metadata)
272
print(f"Training completed with status: {status}")
273
274
# Usage example
275
tracker = MLTrainingMetadataTracker()
276
277
# Track complete ML training workflow
278
tracker.track_training_start("fraud_detection_v2", "s3://data/fraud_training.parquet")
279
tracker.track_data_loading(records_count=1000000, features_count=47)
280
tracker.track_feature_engineering(
281
original_features=["age", "income", "transaction_amount", "merchant", "location"],
282
selected_features=["age", "income", "transaction_amount"]
283
)
284
tracker.track_model_training(
285
algorithm="RandomForestClassifier",
286
hyperparameters={"n_estimators": 100, "max_depth": 10, "min_samples_split": 2}
287
)
288
tracker.track_model_evaluation({
289
"accuracy": 0.94,
290
"precision": 0.91,
291
"recall": 0.96,
292
"f1_score": 0.93
293
})
294
tracker.track_training_completion("s3://models/fraud_detection_v2.pkl", "SUCCESS")
295
```
296
297
### Data Pipeline Lineage Tracking
298
299
```python
300
from aissemble_core_metadata.metadata_model import MetadataModel
301
from aissemble_core_metadata.hive_metadata_api_service import HiveMetadataAPIService
302
from datetime import datetime
303
import json
304
305
class DataLineageTracker:
306
"""Track data lineage across processing pipeline stages"""
307
308
def __init__(self, pipeline_id: str):
309
self.pipeline_id = pipeline_id
310
self.metadata_service = HiveMetadataAPIService()
311
self.stage_counter = 0
312
313
def track_data_ingestion(self, source_path: str, records_ingested: int):
314
"""Track data ingestion from external source"""
315
self.stage_counter += 1
316
317
metadata = MetadataModel(
318
resource=f"{self.pipeline_id}_stage_{self.stage_counter}",
319
subject="data-ingestion",
320
action="DATA_INGESTED",
321
additionalValues={
322
"pipeline_id": self.pipeline_id,
323
"stage_number": str(self.stage_counter),
324
"source_path": source_path,
325
"records_ingested": str(records_ingested),
326
"ingestion_method": "spark_batch",
327
"data_format": "parquet"
328
}
329
)
330
self.metadata_service.create_metadata(metadata)
331
332
def track_data_transformation(self, transformation_type: str, input_records: int, output_records: int):
333
"""Track data transformation operations"""
334
self.stage_counter += 1
335
336
metadata = MetadataModel(
337
resource=f"{self.pipeline_id}_stage_{self.stage_counter}",
338
subject="data-transformer",
339
action="DATA_TRANSFORMED",
340
additionalValues={
341
"pipeline_id": self.pipeline_id,
342
"stage_number": str(self.stage_counter),
343
"transformation_type": transformation_type,
344
"input_records": str(input_records),
345
"output_records": str(output_records),
346
"transformation_time": str(datetime.now().timestamp())
347
}
348
)
349
self.metadata_service.create_metadata(metadata)
350
351
def track_data_quality_check(self, quality_rules: dict, quality_score: float):
352
"""Track data quality validation"""
353
self.stage_counter += 1
354
355
metadata = MetadataModel(
356
resource=f"{self.pipeline_id}_stage_{self.stage_counter}",
357
subject="quality-checker",
358
action="QUALITY_VALIDATED",
359
additionalValues={
360
"pipeline_id": self.pipeline_id,
361
"stage_number": str(self.stage_counter),
362
"quality_score": str(quality_score),
363
"quality_rules": json.dumps(quality_rules),
364
"validation_passed": str(quality_score >= 0.8)
365
}
366
)
367
self.metadata_service.create_metadata(metadata)
368
369
def track_data_output(self, output_path: str, records_written: int):
370
"""Track data output to destination"""
371
self.stage_counter += 1
372
373
metadata = MetadataModel(
374
resource=f"{self.pipeline_id}_stage_{self.stage_counter}",
375
subject="data-writer",
376
action="DATA_WRITTEN",
377
additionalValues={
378
"pipeline_id": self.pipeline_id,
379
"stage_number": str(self.stage_counter),
380
"output_path": output_path,
381
"records_written": str(records_written),
382
"write_format": "delta",
383
"partition_strategy": "date_based"
384
}
385
)
386
self.metadata_service.create_metadata(metadata)
387
388
def track_pipeline_completion(self, status: str, total_runtime: str):
389
"""Track overall pipeline completion"""
390
metadata = MetadataModel(
391
resource=self.pipeline_id,
392
subject="pipeline-orchestrator",
393
action="PIPELINE_COMPLETED",
394
additionalValues={
395
"final_status": status,
396
"total_stages": str(self.stage_counter),
397
"total_runtime": total_runtime,
398
"completion_time": str(datetime.now().timestamp())
399
}
400
)
401
self.metadata_service.create_metadata(metadata)
402
403
# Usage example
404
lineage_tracker = DataLineageTracker("customer_analytics_pipeline_20240905")
405
406
# Track complete data pipeline
407
lineage_tracker.track_data_ingestion("s3://raw-data/customers.parquet", 2500000)
408
lineage_tracker.track_data_transformation("deduplication", 2500000, 2450000)
409
lineage_tracker.track_data_transformation("enrichment", 2450000, 2450000)
410
lineage_tracker.track_data_quality_check(
411
quality_rules={"completeness": 0.95, "validity": 0.98, "consistency": 0.92},
412
quality_score=0.95
413
)
414
lineage_tracker.track_data_output("s3://processed-data/customer_analytics.delta", 2450000)
415
lineage_tracker.track_pipeline_completion("SUCCESS", "1_hour_23_minutes")
416
```
417
418
### Development vs Production Metadata Services
419
420
```python
421
from aissemble_core_metadata.metadata_model import MetadataModel
422
from aissemble_core_metadata.hive_metadata_api_service import HiveMetadataAPIService
423
from aissemble_core_metadata.logging_metadata_api_service import LoggingMetadataAPIService
424
import os
425
426
class MetadataServiceFactory:
427
"""Factory for creating appropriate metadata service based on environment"""
428
429
@staticmethod
430
def create_metadata_service():
431
"""Create metadata service based on environment"""
432
environment = os.getenv("ENVIRONMENT", "development")
433
434
if environment == "production":
435
print("Using Hive metadata service (Kafka)")
436
return HiveMetadataAPIService()
437
else:
438
print("Using logging metadata service (Console)")
439
return LoggingMetadataAPIService()
440
441
class UniversalMetadataTracker:
442
"""Metadata tracker that works across development and production"""
443
444
def __init__(self):
445
self.metadata_service = MetadataServiceFactory.create_metadata_service()
446
447
def track_event(self, resource: str, subject: str, action: str, **additional_values):
448
"""Track any type of event with flexible additional values"""
449
metadata = MetadataModel(
450
resource=resource,
451
subject=subject,
452
action=action,
453
additionalValues={str(k): str(v) for k, v in additional_values.items()}
454
)
455
456
self.metadata_service.create_metadata(metadata)
457
return metadata
458
459
def track_batch_events(self, events: list):
460
"""Track multiple events in sequence"""
461
tracked_events = []
462
463
for event in events:
464
metadata = self.track_event(**event)
465
tracked_events.append(metadata)
466
467
return tracked_events
468
469
# Usage example
470
tracker = UniversalMetadataTracker()
471
472
# Single event tracking
473
tracker.track_event(
474
resource="user_behavior_analysis",
475
subject="analytics_engine",
476
action="ANALYSIS_COMPLETED",
477
user_count=150000,
478
analysis_type="behavioral_segmentation",
479
execution_time="45_minutes"
480
)
481
482
# Batch event tracking
483
batch_events = [
484
{
485
"resource": "daily_report_001",
486
"subject": "report_generator",
487
"action": "REPORT_STARTED",
488
"report_type": "sales_summary"
489
},
490
{
491
"resource": "daily_report_001",
492
"subject": "data_aggregator",
493
"action": "DATA_AGGREGATED",
494
"records_processed": 500000
495
},
496
{
497
"resource": "daily_report_001",
498
"subject": "report_generator",
499
"action": "REPORT_COMPLETED",
500
"output_path": "s3://reports/daily_sales_20240905.pdf"
501
}
502
]
503
504
tracked = tracker.track_batch_events(batch_events)
505
print(f"Tracked {len(tracked)} events")
506
```
507
508
### Custom Metadata Extensions
509
510
```python
511
from aissemble_core_metadata.metadata_model import MetadataModel
512
from aissemble_core_metadata.hive_metadata_api_service import HiveMetadataAPIService
513
from datetime import datetime
514
import json
515
516
class EnhancedMetadataModel(MetadataModel):
517
"""Extended metadata model with domain-specific fields"""
518
519
def __init__(self, **kwargs):
520
super().__init__(**kwargs)
521
522
def add_ml_context(self, model_version: str, experiment_id: str, dataset_version: str):
523
"""Add ML-specific context"""
524
self.additionalValues.update({
525
"ml_model_version": model_version,
526
"ml_experiment_id": experiment_id,
527
"ml_dataset_version": dataset_version,
528
"ml_context_added": str(datetime.now().timestamp())
529
})
530
return self
531
532
def add_data_context(self, schema_version: str, partition_info: dict, quality_metrics: dict):
533
"""Add data processing context"""
534
self.additionalValues.update({
535
"data_schema_version": schema_version,
536
"data_partition_info": json.dumps(partition_info),
537
"data_quality_metrics": json.dumps(quality_metrics),
538
"data_context_added": str(datetime.now().timestamp())
539
})
540
return self
541
542
def add_infrastructure_context(self, cluster_id: str, node_count: int, resource_usage: dict):
543
"""Add infrastructure context"""
544
self.additionalValues.update({
545
"infra_cluster_id": cluster_id,
546
"infra_node_count": str(node_count),
547
"infra_resource_usage": json.dumps(resource_usage),
548
"infra_context_added": str(datetime.now().timestamp())
549
})
550
return self
551
552
class DomainSpecificMetadataService:
553
"""Service for domain-specific metadata operations"""
554
555
def __init__(self):
556
self.base_service = HiveMetadataAPIService()
557
558
def track_ml_inference(self, model_id: str, request_count: int, latency_ms: float):
559
"""Track ML inference events"""
560
metadata = EnhancedMetadataModel(
561
resource=f"inference_{model_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
562
subject="ml_inference_service",
563
action="INFERENCE_EXECUTED"
564
).add_ml_context(
565
model_version="v2.1.0",
566
experiment_id="exp_001",
567
dataset_version="v1.5"
568
).add_infrastructure_context(
569
cluster_id="ml-cluster-prod",
570
node_count=5,
571
resource_usage={"cpu_percent": 75, "memory_gb": 32}
572
)
573
574
metadata.additionalValues.update({
575
"request_count": str(request_count),
576
"average_latency_ms": str(latency_ms),
577
"throughput_rps": str(request_count / (latency_ms / 1000))
578
})
579
580
self.base_service.create_metadata(metadata)
581
return metadata
582
583
def track_data_pipeline_stage(self, pipeline_id: str, stage_name: str,
584
input_size: int, output_size: int):
585
"""Track data pipeline stage execution"""
586
metadata = EnhancedMetadataModel(
587
resource=f"{pipeline_id}_{stage_name}",
588
subject="data_pipeline",
589
action="STAGE_EXECUTED"
590
).add_data_context(
591
schema_version="v1.0",
592
partition_info={"partition_by": "date", "partition_count": 30},
593
quality_metrics={"completeness": 0.98, "accuracy": 0.95}
594
)
595
596
metadata.additionalValues.update({
597
"stage_name": stage_name,
598
"input_record_count": str(input_size),
599
"output_record_count": str(output_size),
600
"data_reduction_ratio": str(output_size / input_size if input_size > 0 else 0)
601
})
602
603
self.base_service.create_metadata(metadata)
604
return metadata
605
606
# Usage example
607
domain_service = DomainSpecificMetadataService()
608
609
# Track ML inference
610
inference_metadata = domain_service.track_ml_inference(
611
model_id="fraud_detection_model",
612
request_count=1000,
613
latency_ms=45.0
614
)
615
616
# Track data pipeline stage
617
pipeline_metadata = domain_service.track_data_pipeline_stage(
618
pipeline_id="customer_segmentation",
619
stage_name="feature_engineering",
620
input_size=1000000,
621
output_size=950000
622
)
623
624
print(f"Tracked inference: {inference_metadata.resource}")
625
print(f"Tracked pipeline stage: {pipeline_metadata.resource}")
626
```
627
628
## Best Practices
629
630
### Metadata Design
631
- Use consistent resource naming conventions
632
- Include comprehensive context in additionalValues
633
- Track both successful and failed operations
634
- Implement structured logging for debugging
635
636
### Service Selection
637
- Use LoggingMetadataAPIService for development and testing
638
- Use HiveMetadataAPIService for production environments
639
- Consider custom implementations for specific requirements
640
- Plan for metadata service migration strategies
641
642
### Performance Considerations
643
- Batch metadata operations when possible
644
- Use asynchronous patterns for high-throughput scenarios
645
- Monitor Kafka topic health and throughput
646
- Implement metadata retention policies
647
648
### Governance and Compliance
649
- Establish metadata standards across teams
650
- Regular metadata quality audits
651
- Implement data lineage tracking
652
- Maintain metadata schema versioning