ZenML is a unified MLOps framework that extends battle-tested machine learning operations principles to support the entire AI stack, from classical machine learning models to advanced AI agents.
Functions for logging metadata and managing tags across ZenML resources. Metadata enables tracking custom information about pipeline runs, steps, artifacts, and models. Tags provide categorization and filtering capabilities.
def log_metadata(
metadata: dict,
infer_resource: bool = True
):
"""
Generic function to log metadata.
Automatically infers the resource (step run, pipeline run, artifact)
from the execution context.
Parameters:
- metadata: Metadata dict to log (keys must be strings)
- infer_resource: Infer resource from context (default: True)
Example:
```python
from zenml import step, log_metadata
@step
def my_step(data: list):
# Log metadata automatically associated with step run
log_metadata({
"data_size": len(data),
"processing_time": "5s"
})
```
"""Import from:
from zenml import log_metadatadef log_step_metadata(
metadata: dict,
step_name: str = None
):
"""
Log metadata for a step.
Parameters:
- metadata: Metadata dict to log (keys must be strings)
- step_name: Step name (uses current context if None)
Example:
```python
from zenml import step, log_step_metadata
@step
def training_step(data: list):
# Training logic
log_step_metadata({
"training_samples": len(data),
"epochs": 10,
"optimizer": "adam"
})
```
"""Import from:
from zenml import log_step_metadatadef log_artifact_metadata(
metadata: dict,
artifact_name: str = None,
artifact_version: str = None
):
"""
Log metadata for an artifact.
Parameters:
- metadata: Metadata dict to log (keys must be strings)
- artifact_name: Artifact name (uses current context if None)
- artifact_version: Artifact version
Example:
```python
from zenml import step, log_artifact_metadata
@step
def process_data(data: list) -> dict:
processed = {"data": data}
log_artifact_metadata({
"rows": len(data),
"validation_status": "passed"
})
return processed
```
"""Import from:
from zenml import log_artifact_metadatadef log_model_metadata(
metadata: dict,
model_name: str = None,
model_version: str = None
):
"""
Log metadata for a model version.
Parameters:
- metadata: Metadata dict to log (keys must be strings)
- model_name: Model name (uses current context if None)
- model_version: Model version
Example:
```python
from zenml import step, log_model_metadata, Model
from zenml import pipeline
@step
def train_model(data: list) -> dict:
model = {"weights": [0.1, 0.2]}
log_model_metadata({
"framework": "custom",
"training_time": "300s",
"accuracy": 0.95
})
return model
@pipeline(model=Model(name="classifier", version="1.0"))
def training_pipeline():
train_model([1, 2, 3])
```
"""Import from:
from zenml import log_model_metadataclass Tag:
"""
Tag model for categorizing resources.
Attributes:
- id: Tag UUID
- name: Tag name
- color: Tag color (from ColorVariants enum)
- tagged_count: Number of resources with this tag
"""Import from:
from zenml import Tagdef add_tags(
tags: list,
*,
pipeline: str = None,
run: str = None,
run_template: str = None,
snapshot: str = None,
deployment: str = None,
artifact: str = None,
artifact_version_id: str = None,
artifact_name: str = None,
artifact_version: str = None,
infer_artifact: bool = None
):
"""
Add tags to various resource types.
Supply exactly one resource identifier. When called without arguments inside a step,
tags the current pipeline run by default.
Parameters:
- tags: List of tag names or Tag objects to add
- pipeline: ID or name of pipeline to tag
- run: ID, name, or prefix of pipeline run to tag
- run_template: ID or name of run template to tag
- snapshot: ID of pipeline snapshot to tag
- deployment: ID or name of deployment to tag
- artifact: ID or name of artifact to tag
- artifact_version_id: UUID of artifact version to tag
- artifact_name: Name of artifact to tag (for use in step context)
- artifact_version: Version of artifact to tag (with artifact_name)
- infer_artifact: Infer artifact version from step context
Example:
```python
from zenml import add_tags, step
from zenml.client import Client
# Tag a pipeline
add_tags(tags=["production", "v2"], pipeline="ml_pipeline")
# Tag a pipeline run
add_tags(tags=["successful"], run="run_12345")
# Tag an artifact
client = Client()
artifact = client.get_artifact("my_dataset")
add_tags(tags=["validated", "production"], artifact=artifact.id)
# Inside a step - tags the current pipeline run
@step
def my_step():
add_tags(tags=["experimental"])
# Tag step output artifact from within step
@step
def my_step() -> dict:
add_tags(tags=["important"], artifact_name="output")
return {"data": "value"}
```
Raises:
ValueError: If no identifiers provided outside step context, or multiple identifiers provided
"""Import from:
from zenml import add_tagsdef remove_tags(
tags: list,
*,
pipeline: str = None,
run: str = None,
run_template: str = None,
snapshot: str = None,
deployment: str = None,
artifact: str = None,
artifact_version_id: str = None,
artifact_name: str = None,
artifact_version: str = None,
infer_artifact: bool = None
):
"""
Remove tags from various resource types.
Supply exactly one resource identifier. When called without arguments inside a step,
removes tags from the current pipeline run by default.
Parameters:
- tags: List of tag names to remove
- pipeline: ID or name of pipeline
- run: ID, name, or prefix of pipeline run
- run_template: ID or name of run template
- snapshot: ID of pipeline snapshot
- deployment: ID or name of deployment
- artifact: ID or name of artifact
- artifact_version_id: UUID of artifact version
- artifact_name: Name of artifact (for use in step context)
- artifact_version: Version of artifact (with artifact_name)
- infer_artifact: Infer artifact version from step context
Example:
```python
from zenml import remove_tags
from zenml.client import Client
# Remove tag from pipeline
remove_tags(tags=["experimental"], pipeline="ml_pipeline")
# Remove tags from artifact
client = Client()
artifact = client.get_artifact("my_dataset")
remove_tags(tags=["staging", "deprecated"], artifact=artifact.id)
# Inside a step - removes from current pipeline run
@step
def my_step():
remove_tags(tags=["draft"])
```
Raises:
ValueError: If no identifiers provided outside step context, or multiple identifiers provided
"""Import from:
from zenml import remove_tagsfrom zenml import step, log_step_metadata
import time
@step
def data_processing(data: list) -> list:
"""Process data and log metadata."""
start_time = time.time()
processed_data = [x * 2 for x in data]
processing_time = time.time() - start_time
log_step_metadata({
"input_size": len(data),
"output_size": len(processed_data),
"processing_time_seconds": processing_time,
"transformation": "multiply_by_2"
})
return processed_datafrom zenml import step, log_artifact_metadata
@step
def create_dataset(size: int) -> dict:
"""Create dataset with metadata."""
dataset = {
"features": [[i, i*2, i*3] for i in range(size)],
"labels": [i % 2 for i in range(size)]
}
log_artifact_metadata({
"dataset_size": size,
"num_features": 3,
"num_classes": 2,
"balance": "50/50",
"created_at": "2024-01-15"
})
return datasetfrom zenml import step, pipeline, Model, log_model_metadata
model_config = Model(name="sentiment_classifier", version="2.0")
@step
def train_model(data: list) -> dict:
"""Train and log model metadata."""
model = {"weights": [0.1, 0.2, 0.3]}
log_model_metadata({
"architecture": "transformer",
"layers": 12,
"parameters": "110M",
"training_samples": len(data),
"training_epochs": 10,
"learning_rate": 0.001,
"optimizer": "adamw"
})
return model
@step
def evaluate_model(model: dict, test_data: list) -> dict:
"""Evaluate and log metrics."""
metrics = {
"accuracy": 0.95,
"precision": 0.93,
"recall": 0.97,
"f1": 0.95
}
log_model_metadata({
"test_accuracy": metrics["accuracy"],
"test_precision": metrics["precision"],
"test_recall": metrics["recall"],
"test_f1": metrics["f1"],
"test_samples": len(test_data)
})
return metrics
@pipeline(model=model_config)
def training_pipeline():
data = [1, 2, 3, 4, 5]
model = train_model(data)
metrics = evaluate_model(model, [6, 7, 8])from zenml import step, log_metadata
@step
def processing_step(data: list) -> dict:
"""Step using generic metadata logging."""
# Automatically logs to step run
log_metadata({
"step_info": "processing",
"data_size": len(data)
})
result = {"processed": data}
return resultfrom zenml.client import Client
from zenml.enums import ColorVariants
client = Client()
# Create tags with colors
client.create_tag(name="production", color=ColorVariants.GREEN)
client.create_tag(name="staging", color=ColorVariants.YELLOW)
client.create_tag(name="experimental", color=ColorVariants.BLUE)
client.create_tag(name="deprecated", color=ColorVariants.RED)
# List all tags
tags = client.list_tags()
for tag in tags:
print(f"{tag.name}: {tag.color} ({tag.tagged_count} resources)")from zenml import save_artifact, add_tags
# Save artifact with tags
artifact = save_artifact(
data={"model": "data"},
name="my_model",
tags=["production", "v1.0", "validated"]
)
# Add more tags later using artifact version ID
add_tags(
tags=["promoted"],
artifact_version_id=artifact.id
)
# Or tag by artifact name
add_tags(tags=["important"], artifact="my_model")from zenml.client import Client
from zenml import add_tags
client = Client()
# Create model version
model = client.create_model(
name="classifier",
tags=["nlp", "classification"]
)
version = client.create_model_version(
model_name_or_id=model.id,
version="1.0.0",
tags=["production", "validated"]
)
# Note: Tagging model versions requires using Client methods
# The add_tags function currently supports pipelines, runs, artifacts, and deploymentsfrom zenml import pipeline, step, add_tags
from zenml.client import Client
@step
def my_step():
# Tag the current pipeline run from within the step
add_tags(tags=["processing", "v2"])
@pipeline
def my_pipeline():
my_step()
# Run pipeline
my_pipeline()
# Tag a specific run by ID
client = Client()
runs = client.list_pipeline_runs(pipeline_id=client.get_pipeline("my_pipeline").id)
latest_run = runs.items[0]
add_tags(
tags=["successful", "baseline"],
run=latest_run.id
)from zenml.client import Client
client = Client()
# List artifacts with specific tag
production_artifacts = client.list_artifacts(tag="production")
# List models with specific tag
nlp_models = client.list_models(tag="nlp")
# List pipeline runs with tag
baseline_runs = client.list_pipeline_runs(tag="baseline")from zenml.client import Client
client = Client()
# Create run metadata
run = client.get_pipeline_run("run_id")
client.create_run_metadata(
resource_id=run.id,
resource_type="pipeline_run",
values={
"environment": "production",
"triggered_by": "scheduler",
"git_commit": "abc123"
}
)
# Query metadata
step_run = client.get_run_step("step_id")
if step_run.metadata:
for key, value in step_run.metadata.items():
print(f"{key}: {value}")from zenml import step, pipeline, Model, log_model_metadata, add_tags
model_config = Model(
name="recommender",
version="3.0",
tags=["recommendation", "collaborative-filtering"]
)
@step
def train_recommender(data: list) -> dict:
"""Train with comprehensive metadata and tags."""
model = {"embeddings": [[0.1, 0.2], [0.3, 0.4]]}
# Log detailed metadata
log_model_metadata({
"algorithm": "matrix_factorization",
"embedding_dim": 2,
"num_users": 1000,
"num_items": 500,
"training_samples": len(data),
"cold_start_strategy": "popularity_baseline"
})
# Tag the pipeline run from within step
add_tags(tags=["training", "recommender-v3"])
# Tag the output artifact
add_tags(tags=["trained-model"], artifact_name="output")
return model
@pipeline(model=model_config)
def recommender_pipeline():
model = train_recommender([1, 2, 3])
return model
# Run pipeline
recommender_pipeline()Install with Tessl CLI
npx tessl i tessl/pypi-zenml