Track ML workflow lineage with artifacts, actions, associations, and contexts for governance and reproducibility.
Track data artifacts throughout the ML workflow.
class Artifact:
"""
Lineage artifact for data tracking.
Parameters:
artifact_name: Optional[str] - Artifact name
- 1-120 characters
- Auto-generated if not provided
artifact_type: str - Artifact type (required)
- Examples: "Dataset", "Model", "Image", "DatasetSnapshot"
- Custom types allowed
source: Optional[Dict] - Source information with source_uri
- Structure: {"source_uri": "s3://...", "source_types": [...]}
properties: Optional[Dict[str, str]] - Artifact properties
- Metadata key-value pairs
- Maximum 30 properties
tags: Optional[List[Tag]] - Resource tags
sagemaker_session: Optional[Session] - SageMaker session
Methods:
create(artifact_name=None, artifact_type, source=None, properties=None,
tags=None, sagemaker_session=None) -> Artifact
Create new artifact.
Parameters:
artifact_name: Optional[str] - Name
artifact_type: str - Type (required)
source: Optional[Dict] - Source info
properties: Optional[Dict] - Properties
tags: Optional[List[Tag]] - Tags
sagemaker_session: Optional[Session] - Session
Returns:
Artifact: Created artifact
Raises:
ValueError: Invalid artifact_type or properties
ClientError: If artifact already exists
load(artifact_arn, sagemaker_session=None) -> Artifact
Load existing artifact by ARN.
Parameters:
artifact_arn: str - Artifact ARN (required)
sagemaker_session: Optional[Session] - Session
Returns:
Artifact: Loaded artifact
Raises:
ClientError: If artifact not found
list(source_uri=None, artifact_type=None, created_after=None,
created_before=None, sort_by="CreationTime", sort_order="Descending",
max_results=100, sagemaker_session=None) -> List[Artifact]
List artifacts with filtering.
Parameters:
source_uri: Optional[str] - Filter by source URI
artifact_type: Optional[str] - Filter by type
created_after: Optional[datetime] - Filter by creation time
created_before: Optional[datetime] - Filter by creation time
sort_by: str - Sort field (default: "CreationTime")
sort_order: str - Sort order (default: "Descending")
max_results: int - Maximum results (1-100)
sagemaker_session: Optional[Session] - Session
Returns:
List[Artifact]: Filtered artifacts list
save() -> None
Save artifact changes (properties, tags).
Raises:
ClientError: If update fails
delete() -> None
Delete artifact and all associations.
Raises:
ClientError: If deletion fails
set_tag(tag) -> None
Add single tag.
Parameters:
tag: Dict - Tag dictionary with Key and Value
set_tags(tags) -> None
Set multiple tags.
Parameters:
tags: List[Dict] - Tags list
Attributes:
artifact_arn: str - Artifact ARN
artifact_name: str - Artifact name
artifact_type: str - Artifact type
source: Dict - Source information
properties: Dict[str, str] - Artifact properties (mutable)
creation_time: datetime - Creation timestamp
created_by: Dict - Creator information
last_modified_time: datetime - Last modification timestamp
last_modified_by: Dict - Last modifier information
Notes:
- Track datasets, models, images, configs, etc.
- Properties mutable via save()
- Tags for organization and cost tracking
- Soft delete: associations removed first
"""Usage:
from sagemaker.core.lineage import Artifact
# Create dataset artifact
dataset_artifact = Artifact.create(
artifact_name="training-dataset-v1",
artifact_type="Dataset",
source={
"source_uri": "s3://my-bucket/datasets/train.csv",
"source_types": [
{"SourceIdType": "S3", "Value": "s3://my-bucket/datasets/train.csv"}
]
},
properties={
"num_samples": "10000",
"features": "age,income,education,occupation",
"target": "churn",
"split": "train",
"version": "1.0"
},
tags=[
{"Key": "Project", "Value": "CustomerChurn"},
{"Key": "DataSource", "Value": "CRM"}
]
)
print(f"Artifact created: {dataset_artifact.artifact_arn}")
# Load existing artifact
artifact = Artifact.load(
artifact_arn="arn:aws:sagemaker:us-west-2:123:artifact/abc123"
)
# List artifacts by source
datasets = Artifact.list(
source_uri="s3://my-bucket/datasets",
artifact_type="Dataset",
sort_by="CreationTime",
sort_order="Descending",
max_results=50
)
print(f"Found {len(datasets)} dataset artifacts")
# Update properties
artifact.properties["version"] = "2.0"
artifact.properties["last_updated"] = "2024-01-15"
artifact.save()
# Delete artifact
# artifact.delete()Track actions performed in the ML workflow.
class Action:
"""
Lineage action for workflow steps.
Parameters:
action_name: Optional[str] - Action name
- Auto-generated if not provided
action_type: str - Action type (required)
- Examples: "Training", "Processing", "Transform", "Deployment"
source: Optional[Dict] - Source information
properties: Optional[Dict[str, str]] - Action properties
status: Optional[str] - Action status
- "InProgress", "Completed", "Failed", "Stopped"
tags: Optional[List[Tag]] - Resource tags
sagemaker_session: Optional[Session] - SageMaker session
Methods:
create(action_name=None, action_type, source=None, properties=None,
status=None, tags=None, sagemaker_session=None) -> Action
Create new action.
Returns:
Action: Created action
load(action_arn, sagemaker_session=None) -> Action
Load existing action.
Returns:
Action: Loaded action
list(source_uri=None, action_type=None, created_after=None,
created_before=None, sort_by="CreationTime", sort_order="Descending",
max_results=100, sagemaker_session=None) -> List[Action]
List actions with filtering.
Returns:
List[Action]: Filtered actions
save() -> None
Save action changes.
delete() -> None
Delete action.
set_tag(tag) -> None
Add tag.
set_tags(tags) -> None
Set multiple tags.
Attributes:
action_arn: str - Action ARN
action_name: str - Action name
action_type: str - Action type
status: Optional[str] - Action status
properties: Dict[str, str] - Action properties (mutable)
Notes:
- Track training jobs, processing jobs, deployments
- Status tracks action lifecycle
- Properties for algorithm, hyperparameters, etc.
"""Usage:
from sagemaker.core.lineage import Action
# Create training action
training_action = Action.create(
action_name="train-xgboost-v1",
action_type="Training",
source={
"source_uri": "arn:aws:sagemaker:us-west-2:123:training-job/my-job",
"source_types": [
{"SourceIdType": "ARN", "Value": "arn:aws:sagemaker:..."}
]
},
properties={
"algorithm": "xgboost",
"instance_type": "ml.m5.xlarge",
"hyperparameters": json.dumps({
"max_depth": 5,
"eta": 0.2,
"num_round": 100
})
},
status="Completed",
tags=[{"Key": "Project", "Value": "Churn"}]
)
# Update status during execution
training_action.status = "InProgress"
training_action.save()
# Complete action
training_action.status = "Completed"
training_action.properties["accuracy"] = "0.94"
training_action.save()
# List training actions
training_actions = Action.list(
action_type="Training",
created_after="2024-01-01",
sort_by="CreationTime"
)Create relationships between artifacts and actions.
class Association:
"""
Lineage association between artifacts and actions.
Parameters:
source_arn: str - Source ARN (artifact or action) (required)
destination_arn: str - Destination ARN (artifact or action) (required)
association_type: str - Association type (required)
- "ContributedTo": Source contributed to destination
- "AssociatedWith": Generic association
- "DerivedFrom": Destination derived from source
- "Produced": Source produced destination
- "SameAs": Source and destination are same
sagemaker_session: Optional[Session] - SageMaker session
Methods:
create(source_arn, destination_arn, association_type, sagemaker_session=None) -> Association
Create association.
Parameters:
source_arn: str - Source ARN (required)
destination_arn: str - Destination ARN (required)
association_type: str - Type (required)
sagemaker_session: Optional[Session] - Session
Returns:
Association: Created association
Raises:
ValueError: Invalid association_type
ClientError: If entities don't exist
list(source_arn=None, destination_arn=None, association_type=None,
created_after=None, created_before=None, sort_by="CreationTime",
sort_order="Descending", max_results=100, sagemaker_session=None) -> List[Association]
List associations.
Returns:
List[Association]: Filtered associations
delete(source_arn, destination_arn, sagemaker_session=None) -> None
Delete specific association.
Parameters:
source_arn: str - Source ARN (required)
destination_arn: str - Destination ARN (required)
sagemaker_session: Optional[Session] - Session
Association Types:
ContributedTo: Source contributed to destination
- Example: Dataset contributed to Training
AssociatedWith: Generic association
- Example: Model associated with Experiment
DerivedFrom: Destination derived from source
- Example: Model v2 derived from Model v1
Produced: Source produced destination
- Example: Training produced Model
SameAs: Source and destination are same entity
- Example: Link different representations
Attributes:
source_arn: str - Source ARN
destination_arn: str - Destination ARN
association_type: str - Association type
Notes:
- Build directed graph of ML workflow
- Query lineage upstream (what produced this) or downstream (what this produced)
- Multiple associations per entity allowed
- Deletion doesn't affect entities, only relationship
"""Usage:
from sagemaker.core.lineage import Association
# Dataset contributed to training
Association.create(
source_arn=dataset_artifact.artifact_arn,
destination_arn=training_action.action_arn,
association_type="ContributedTo"
)
# Training produced model
model_artifact = Artifact.create(
artifact_name="churn-model-v1",
artifact_type="Model",
source={"source_uri": "s3://bucket/model.tar.gz"}
)
Association.create(
source_arn=training_action.action_arn,
destination_arn=model_artifact.artifact_arn,
association_type="Produced"
)
# Model derived from previous version
previous_model = Artifact.load(previous_model_arn)
Association.create(
source_arn=previous_model.artifact_arn,
destination_arn=model_artifact.artifact_arn,
association_type="DerivedFrom"
)
# List associations for artifact
associations = Association.list(
source_arn=dataset_artifact.artifact_arn,
association_type="ContributedTo"
)
print(f"Dataset used in {len(associations)} training jobs")
# Delete association
Association.delete(
source_arn=dataset_artifact.artifact_arn,
destination_arn=training_action.action_arn
)Group related artifacts and actions into contexts.
class Context:
"""
Lineage context for grouping related entities.
Parameters:
context_name: Optional[str] - Context name
- Auto-generated if not provided
context_type: str - Context type (required)
- Examples: "Experiment", "Pipeline", "Project", "Endpoint"
source: Optional[Dict] - Source information
properties: Optional[Dict[str, str]] - Context properties
tags: Optional[List[Tag]] - Resource tags
sagemaker_session: Optional[Session] - SageMaker session
Methods:
create(context_name=None, context_type, source=None, properties=None,
tags=None, sagemaker_session=None) -> Context
Create new context.
Returns:
Context: Created context
load(context_arn, sagemaker_session=None) -> Context
Load existing context.
Returns:
Context: Loaded context
list(context_type=None, created_after=None, created_before=None,
sort_by="CreationTime", sort_order="Descending", max_results=100,
sagemaker_session=None) -> List[Context]
List contexts.
Returns:
List[Context]: Filtered contexts
save() -> None
Save context changes.
delete() -> None
Delete context (associations preserved).
add_artifact(artifact_arn, association_type="AssociatedWith") -> None
Add artifact to context.
Parameters:
artifact_arn: str - Artifact ARN (required)
association_type: str - Association type (default: "AssociatedWith")
add_action(action_arn, association_type="AssociatedWith") -> None
Add action to context.
Parameters:
action_arn: str - Action ARN (required)
association_type: str - Association type (default: "AssociatedWith")
Attributes:
context_arn: str - Context ARN
context_name: str - Context name
context_type: str - Context type
properties: Dict[str, str] - Context properties (mutable)
source: Optional[Dict] - Source information
Notes:
- Group related lineage entities
- Query all entities in context
- Useful for project/pipeline organization
- Deleting context doesn't delete entities
"""Usage:
from sagemaker.core.lineage import Context
# Create pipeline context
pipeline_context = Context.create(
context_name="customer-churn-pipeline-v1",
context_type="Pipeline",
properties={
"pipeline_version": "v1.0",
"environment": "production",
"owner": "data-science-team",
"schedule": "daily"
},
tags=[{"Key": "Project", "Value": "CustomerChurn"}]
)
# Add all pipeline artifacts and actions
pipeline_context.add_artifact(raw_dataset_artifact.artifact_arn, "AssociatedWith")
pipeline_context.add_action(preprocess_action.action_arn, "AssociatedWith")
pipeline_context.add_artifact(processed_dataset_artifact.artifact_arn, "AssociatedWith")
pipeline_context.add_action(training_action.action_arn, "AssociatedWith")
pipeline_context.add_artifact(model_artifact.artifact_arn, "AssociatedWith")
pipeline_context.add_action(deployment_action.action_arn, "AssociatedWith")
print(f"Pipeline context created: {pipeline_context.context_arn}")
# List all pipeline contexts
pipeline_contexts = Context.list(
context_type="Pipeline",
sort_by="CreationTime",
sort_order="Descending"
)
print("Recent pipelines:")
for ctx in pipeline_contexts[:5]:
print(f" {ctx.context_name}: {ctx.properties.get('environment')}")class LineageQuery:
"""
Query lineage graph.
Parameters:
start_arns: List[str] - Starting ARNs for query (required)
- Artifact ARNs, Action ARNs, or Context ARNs
direction: LineageQueryDirectionEnum - Query direction (required)
- ASCENDANTS: Query upstream (what led to this)
- DESCENDANTS: Query downstream (what this led to)
- BOTH: Query in both directions
include_edges: bool - Include edges in result (default: True)
filters: Optional[LineageFilter] - Query filters
max_depth: Optional[int] - Maximum traversal depth (default: 10)
- Range: 1-100
sagemaker_session: Optional[Session] - SageMaker session
Methods:
query() -> Dict
Execute query and return lineage graph.
Returns:
Dict: Lineage graph with Vertices and Edges
Structure:
{
"Vertices": [
{
"Arn": "...",
"Type": "Artifact" | "Action" | "Context",
"Properties": {...}
},
...
],
"Edges": [
{
"SourceArn": "...",
"DestinationArn": "...",
"AssociationType": "..."
},
...
]
}
Raises:
ValueError: Invalid start_arns or direction
ClientError: Query execution errors
Notes:
- Returns complete subgraph up to max_depth
- Vertices are nodes (artifacts, actions, contexts)
- Edges are associations between nodes
- Use filters to focus query
- Large graphs can be slow (use max_depth)
"""Usage:
from sagemaker.core.lineage import LineageQuery, LineageQueryDirectionEnum
# Query downstream lineage (what this artifact produced)
query = LineageQuery(
start_arns=[dataset_artifact.artifact_arn],
direction=LineageQueryDirectionEnum.DESCENDANTS,
include_edges=True,
max_depth=10
)
result = query.query()
print(f"Lineage graph:")
print(f" Vertices: {len(result['Vertices'])}")
print(f" Edges: {len(result['Edges'])}")
# Analyze results
for vertex in result['Vertices']:
print(f"\n{vertex['Type']}: {vertex.get('Properties', {}).get('artifact_name') or vertex.get('Properties', {}).get('action_name')}")
# Query upstream lineage (what produced this artifact)
upstream_query = LineageQuery(
start_arns=[model_artifact.artifact_arn],
direction=LineageQueryDirectionEnum.ASCENDANTS,
include_edges=True,
max_depth=10
)
upstream = upstream_query.query()
print(f"\nModel trained from:")
for vertex in upstream['Vertices']:
if vertex['Type'] == 'Artifact' and 'Dataset' in vertex.get('Properties', {}).get('artifact_type', ''):
print(f" - Dataset: {vertex['Properties']['artifact_name']}")
# Query in both directions (complete lineage)
full_query = LineageQuery(
start_arns=[training_action.action_arn],
direction=LineageQueryDirectionEnum.BOTH,
include_edges=True
)
full_lineage = full_query.query()class LineageFilter:
"""
Filter for lineage queries.
Parameters:
entities: Optional[List[LineageEntityEnum]] - Entity types to include
- [LineageEntityEnum.ARTIFACT, LineageEntityEnum.ACTION]
sources: Optional[List[Dict]] - Source filters
created_before: Optional[datetime] - Created before timestamp
created_after: Optional[datetime] - Created after timestamp
modified_before: Optional[datetime] - Modified before timestamp
modified_after: Optional[datetime] - Modified after timestamp
properties: Optional[Dict[str, str]] - Property filters
- Match entities with specific properties
Usage:
Filter lineage query results to focus on relevant entities.
Notes:
- All filters combined with AND logic
- Empty filter = no filtering
- Property filters match exact values
"""class LineageEntityEnum(Enum):
"""
Lineage entity types.
Values:
ARTIFACT = "Artifact" - Artifact entity
ACTION = "Action" - Action entity
CONTEXT = "Context" - Context entity
ASSOCIATION = "Association" - Association entity
Usage:
Specify entity types in LineageFilter.
"""class LineageQueryDirectionEnum(Enum):
"""
Query direction enumeration.
Values:
ASCENDANTS = "Ascendants"
Query upstream (what led to this)
Example: Find all datasets used to train a model
DESCENDANTS = "Descendants"
Query downstream (what this led to)
Example: Find all models trained from a dataset
BOTH = "Both"
Query in both directions
Example: Complete workflow including upstream and downstream
Usage:
Specify direction in LineageQuery.
Notes:
- ASCENDANTS: trace back to data sources
- DESCENDANTS: trace forward to deployments
- BOTH: complete lineage graph
"""class LineageTableVisualizer:
"""
Visualize lineage as table.
Methods:
show(lineage_graph) -> None
Display lineage in table format.
Parameters:
lineage_graph: Dict - Lineage graph from query()
Usage:
Render lineage query results as formatted table for analysis.
Notes:
- Text-based table visualization
- Shows entities and relationships
- Use in Jupyter notebooks
"""Usage:
from sagemaker.core.lineage import LineageTableVisualizer
# Query lineage
query = LineageQuery(
start_arns=[model_artifact.artifact_arn],
direction=LineageQueryDirectionEnum.BOTH
)
graph = query.query()
# Visualize
visualizer = LineageTableVisualizer()
visualizer.show(graph)
# Output example:
# +----------------+----------------------+-------------------+
# | Type | Name | ARN |
# +----------------+----------------------+-------------------+
# | Artifact | training-dataset-v1 | arn:aws:... |
# | Action | train-model-v1 | arn:aws:... |
# | Artifact | trained-model-v1 | arn:aws:... |
# +----------------+----------------------+-------------------+from sagemaker.core.lineage import Artifact, Action, Association, Context
# Create pipeline context
pipeline_context = Context.create(
context_name="ml-pipeline-2024-01-15",
context_type="Pipeline",
properties={
"version": "v2.0",
"environment": "production"
}
)
# Track entire workflow
# 1. Raw data
raw_data = Artifact.create(
artifact_name="raw-customer-data-2024-01-15",
artifact_type="Dataset",
source={"source_uri": "s3://bucket/raw/2024-01-15/"},
properties={
"source": "CRM_system",
"record_count": "50000",
"date": "2024-01-15"
}
)
# 2. Preprocessing action
preprocess = Action.create(
action_name="preprocess-2024-01-15",
action_type="Processing",
properties={
"processor": "spark",
"transformations": "cleaning,feature_engineering,splitting"
},
status="Completed"
)
Association.create(raw_data.artifact_arn, preprocess.action_arn, "ContributedTo")
# 3. Processed datasets
processed_data = Artifact.create(
artifact_name="processed-data-2024-01-15",
artifact_type="Dataset",
source={"source_uri": "s3://bucket/processed/2024-01-15/"},
properties={
"train_samples": "35000",
"val_samples": "7500",
"test_samples": "7500"
}
)
Association.create(preprocess.action_arn, processed_data.artifact_arn, "Produced")
# 4. Training action
train = Action.create(
action_name="train-2024-01-15",
action_type="Training",
properties={
"algorithm": "xgboost",
"instance_type": "ml.m5.2xlarge",
"training_time_seconds": "1200"
},
status="Completed"
)
Association.create(processed_data.artifact_arn, train.action_arn, "ContributedTo")
# 5. Model artifact
model = Artifact.create(
artifact_name="churn-model-2024-01-15",
artifact_type="Model",
source={"source_uri": "s3://bucket/models/2024-01-15/model.tar.gz"},
properties={
"accuracy": "0.94",
"f1_score": "0.92",
"framework": "xgboost",
"version": "1.7.3"
}
)
Association.create(train.action_arn, model.artifact_arn, "Produced")
# 6. Add all to pipeline context
for arn in [
raw_data.artifact_arn,
preprocess.action_arn,
processed_data.artifact_arn,
train.action_arn,
model.artifact_arn
]:
if "artifact" in arn:
pipeline_context.add_artifact(arn, "AssociatedWith")
else:
pipeline_context.add_action(arn, "AssociatedWith")
print(f"Complete pipeline tracked in context: {pipeline_context.context_arn}")# Track model versions with approval workflow
model_v1 = Artifact.create(
artifact_name="customer-churn-model-v1.0",
artifact_type="Model",
source={"source_uri": "s3://models/churn-v1.0.tar.gz"},
properties={
"accuracy": "0.85",
"approved": "true",
"approver": "john.doe@company.com",
"approval_date": "2024-01-10",
"production_deployment": "endpoint-prod-v1"
}
)
# New version with improvements
model_v2 = Artifact.create(
artifact_name="customer-churn-model-v2.0",
artifact_type="Model",
source={"source_uri": "s3://models/churn-v2.0.tar.gz"},
properties={
"accuracy": "0.89",
"f1_score": "0.87",
"approved": "pending",
"reviewer": "jane.smith@company.com",
"improvements": "better_feature_engineering,larger_dataset"
}
)
# Link versions
Association.create(
model_v1.artifact_arn,
model_v2.artifact_arn,
"DerivedFrom"
)
# Query all model versions
models = Artifact.list(
artifact_type="Model",
sort_by="CreationTime",
sort_order="Descending"
)
print("Model version history:")
for model in models:
approval = model.properties.get("approved", "unknown")
accuracy = model.properties.get("accuracy", "N/A")
print(f" {model.artifact_name}: accuracy={accuracy}, approved={approval}")# Query complete data lineage for production model
prod_model_arn = "arn:aws:sagemaker:us-west-2:123:artifact/production-model"
# Find all data sources
provenance_query = LineageQuery(
start_arns=[prod_model_arn],
direction=LineageQueryDirectionEnum.ASCENDANTS,
max_depth=20
)
lineage = provenance_query.query()
# Extract datasets
datasets = []
for vertex in lineage['Vertices']:
if vertex.get('Type') == 'Artifact':
artifact_type = vertex.get('Properties', {}).get('artifact_type', '')
if 'Dataset' in artifact_type:
datasets.append({
'name': vertex['Properties'].get('artifact_name'),
'uri': vertex['Properties'].get('source_uri'),
'samples': vertex['Properties'].get('num_samples')
})
print(f"Production model trained from {len(datasets)} datasets:")
for ds in datasets:
print(f" - {ds['name']}: {ds['samples']} samples")
print(f" Source: {ds['uri']}")from datetime import datetime, timedelta
# Find all models created in last 30 days
thirty_days_ago = datetime.now() - timedelta(days=30)
recent_models = Artifact.list(
artifact_type="Model",
created_after=thirty_days_ago.isoformat(),
sort_by="CreationTime",
sort_order="Descending"
)
# Generate compliance report
print("=== Model Compliance Report ===\n")
for model in recent_models:
print(f"Model: {model.artifact_name}")
print(f" Created: {model.creation_time}")
print(f" Approval: {model.properties.get('approved', 'unknown')}")
# Query upstream lineage
query = LineageQuery(
start_arns=[model.artifact_arn],
direction=LineageQueryDirectionEnum.ASCENDANTS
)
lineage = query.query()
# Count entities
datasets = sum(1 for v in lineage['Vertices']
if v['Type'] == 'Artifact' and 'Dataset' in v.get('Properties', {}).get('artifact_type', ''))
training_jobs = sum(1 for v in lineage['Vertices']
if v['Type'] == 'Action' and v.get('Properties', {}).get('action_type') == 'Training')
print(f" Data sources: {datasets}")
print(f" Training jobs: {training_jobs}")
# Check for required approvals
if model.properties.get('approved') != 'true':
print(f" WARNING: Model not approved for production!")
print()# Given a model, reproduce its training
model_arn = "arn:aws:sagemaker:us-west-2:123:artifact/model-abc"
# Query complete upstream lineage
query = LineageQuery(
start_arns=[model_arn],
direction=LineageQueryDirectionEnum.ASCENDANTS,
include_edges=True
)
lineage = query.query()
# Extract training details
training_info = {}
for vertex in lineage['Vertices']:
if vertex['Type'] == 'Action' and vertex.get('Properties', {}).get('action_type') == 'Training':
# Extract hyperparameters
properties = vertex['Properties']
training_info = {
'algorithm': properties.get('algorithm'),
'hyperparameters': json.loads(properties.get('hyperparameters', '{}')),
'instance_type': properties.get('instance_type'),
'training_time': properties.get('training_time_seconds')
}
elif vertex['Type'] == 'Artifact' and 'Dataset' in vertex.get('Properties', {}).get('artifact_type', ''):
# Find training dataset
training_info['dataset_uri'] = vertex['Properties'].get('source_uri')
print("Training configuration for reproduction:")
print(json.dumps(training_info, indent=2))
# Use to reproduce training
trainer = ModelTrainer(
training_image=training_info['algorithm'],
compute=Compute(instance_type=training_info['instance_type']),
hyperparameters=training_info['hyperparameters']
)
train_data = InputData(
channel_name="training",
data_source=training_info['dataset_uri']
)
# Reproduce training
trainer.train(input_data_config=[train_data])Circular Association:
Entity Not Found:
Query Timeout:
Too Many Results:
Property Update Failed:
Association Already Exists: