or run

tessl search
Log in

Version

Files

docs

ai-registry.mdclarify.mddata-io.mddebugger.mdevaluation.mdexperiments.mdexplainer-config.mdindex.mdjumpstart.mdlineage.mdmlops.mdmonitoring.mdprocessing.mdremote-functions.mdresources.mds3-utilities.mdserving.mdtraining.mdworkflow-primitives.md
tile.json

lineage.mddocs/

Lineage Tracking

Track ML workflow lineage with artifacts, actions, associations, and contexts for governance and reproducibility.

Capabilities

Artifact

Track data artifacts throughout the ML workflow.

class Artifact:
    """
    Lineage artifact for data tracking.

    Parameters:
        artifact_name: Optional[str] - Artifact name
            - 1-120 characters
            - Auto-generated if not provided
        artifact_type: str - Artifact type (required)
            - Examples: "Dataset", "Model", "Image", "DatasetSnapshot"
            - Custom types allowed
        source: Optional[Dict] - Source information with source_uri
            - Structure: {"source_uri": "s3://...", "source_types": [...]}
        properties: Optional[Dict[str, str]] - Artifact properties
            - Metadata key-value pairs
            - Maximum 30 properties
        tags: Optional[List[Tag]] - Resource tags
        sagemaker_session: Optional[Session] - SageMaker session

    Methods:
        create(artifact_name=None, artifact_type, source=None, properties=None, 
               tags=None, sagemaker_session=None) -> Artifact
            Create new artifact.
            
            Parameters:
                artifact_name: Optional[str] - Name
                artifact_type: str - Type (required)
                source: Optional[Dict] - Source info
                properties: Optional[Dict] - Properties
                tags: Optional[List[Tag]] - Tags
                sagemaker_session: Optional[Session] - Session
            
            Returns:
                Artifact: Created artifact
            
            Raises:
                ValueError: Invalid artifact_type or properties
                ClientError: If artifact already exists
        
        load(artifact_arn, sagemaker_session=None) -> Artifact
            Load existing artifact by ARN.
            
            Parameters:
                artifact_arn: str - Artifact ARN (required)
                sagemaker_session: Optional[Session] - Session
            
            Returns:
                Artifact: Loaded artifact
            
            Raises:
                ClientError: If artifact not found
        
        list(source_uri=None, artifact_type=None, created_after=None, 
             created_before=None, sort_by="CreationTime", sort_order="Descending", 
             max_results=100, sagemaker_session=None) -> List[Artifact]
            List artifacts with filtering.
            
            Parameters:
                source_uri: Optional[str] - Filter by source URI
                artifact_type: Optional[str] - Filter by type
                created_after: Optional[datetime] - Filter by creation time
                created_before: Optional[datetime] - Filter by creation time
                sort_by: str - Sort field (default: "CreationTime")
                sort_order: str - Sort order (default: "Descending")
                max_results: int - Maximum results (1-100)
                sagemaker_session: Optional[Session] - Session
            
            Returns:
                List[Artifact]: Filtered artifacts list
        
        save() -> None
            Save artifact changes (properties, tags).
            
            Raises:
                ClientError: If update fails
        
        delete() -> None
            Delete artifact and all associations.
            
            Raises:
                ClientError: If deletion fails
        
        set_tag(tag) -> None
            Add single tag.
            
            Parameters:
                tag: Dict - Tag dictionary with Key and Value
        
        set_tags(tags) -> None
            Set multiple tags.
            
            Parameters:
                tags: List[Dict] - Tags list

    Attributes:
        artifact_arn: str - Artifact ARN
        artifact_name: str - Artifact name
        artifact_type: str - Artifact type
        source: Dict - Source information
        properties: Dict[str, str] - Artifact properties (mutable)
        creation_time: datetime - Creation timestamp
        created_by: Dict - Creator information
        last_modified_time: datetime - Last modification timestamp
        last_modified_by: Dict - Last modifier information
    
    Notes:
        - Track datasets, models, images, configs, etc.
        - Properties mutable via save()
        - Tags for organization and cost tracking
        - Soft delete: associations removed first
    """

Usage:

from sagemaker.core.lineage import Artifact

# Create dataset artifact
dataset_artifact = Artifact.create(
    artifact_name="training-dataset-v1",
    artifact_type="Dataset",
    source={
        "source_uri": "s3://my-bucket/datasets/train.csv",
        "source_types": [
            {"SourceIdType": "S3", "Value": "s3://my-bucket/datasets/train.csv"}
        ]
    },
    properties={
        "num_samples": "10000",
        "features": "age,income,education,occupation",
        "target": "churn",
        "split": "train",
        "version": "1.0"
    },
    tags=[
        {"Key": "Project", "Value": "CustomerChurn"},
        {"Key": "DataSource", "Value": "CRM"}
    ]
)

print(f"Artifact created: {dataset_artifact.artifact_arn}")

# Load existing artifact
artifact = Artifact.load(
    artifact_arn="arn:aws:sagemaker:us-west-2:123:artifact/abc123"
)

# List artifacts by source
datasets = Artifact.list(
    source_uri="s3://my-bucket/datasets",
    artifact_type="Dataset",
    sort_by="CreationTime",
    sort_order="Descending",
    max_results=50
)

print(f"Found {len(datasets)} dataset artifacts")

# Update properties
artifact.properties["version"] = "2.0"
artifact.properties["last_updated"] = "2024-01-15"
artifact.save()

# Delete artifact
# artifact.delete()

Action

Track actions performed in the ML workflow.

class Action:
    """
    Lineage action for workflow steps.

    Parameters:
        action_name: Optional[str] - Action name
            - Auto-generated if not provided
        action_type: str - Action type (required)
            - Examples: "Training", "Processing", "Transform", "Deployment"
        source: Optional[Dict] - Source information
        properties: Optional[Dict[str, str]] - Action properties
        status: Optional[str] - Action status
            - "InProgress", "Completed", "Failed", "Stopped"
        tags: Optional[List[Tag]] - Resource tags
        sagemaker_session: Optional[Session] - SageMaker session

    Methods:
        create(action_name=None, action_type, source=None, properties=None, 
               status=None, tags=None, sagemaker_session=None) -> Action
            Create new action.
            
            Returns:
                Action: Created action
        
        load(action_arn, sagemaker_session=None) -> Action
            Load existing action.
            
            Returns:
                Action: Loaded action
        
        list(source_uri=None, action_type=None, created_after=None, 
             created_before=None, sort_by="CreationTime", sort_order="Descending", 
             max_results=100, sagemaker_session=None) -> List[Action]
            List actions with filtering.
            
            Returns:
                List[Action]: Filtered actions
        
        save() -> None
            Save action changes.
        
        delete() -> None
            Delete action.
        
        set_tag(tag) -> None
            Add tag.
        
        set_tags(tags) -> None
            Set multiple tags.

    Attributes:
        action_arn: str - Action ARN
        action_name: str - Action name
        action_type: str - Action type
        status: Optional[str] - Action status
        properties: Dict[str, str] - Action properties (mutable)
    
    Notes:
        - Track training jobs, processing jobs, deployments
        - Status tracks action lifecycle
        - Properties for algorithm, hyperparameters, etc.
    """

Usage:

from sagemaker.core.lineage import Action

# Create training action
training_action = Action.create(
    action_name="train-xgboost-v1",
    action_type="Training",
    source={
        "source_uri": "arn:aws:sagemaker:us-west-2:123:training-job/my-job",
        "source_types": [
            {"SourceIdType": "ARN", "Value": "arn:aws:sagemaker:..."}
        ]
    },
    properties={
        "algorithm": "xgboost",
        "instance_type": "ml.m5.xlarge",
        "hyperparameters": json.dumps({
            "max_depth": 5,
            "eta": 0.2,
            "num_round": 100
        })
    },
    status="Completed",
    tags=[{"Key": "Project", "Value": "Churn"}]
)

# Update status during execution
training_action.status = "InProgress"
training_action.save()

# Complete action
training_action.status = "Completed"
training_action.properties["accuracy"] = "0.94"
training_action.save()

# List training actions
training_actions = Action.list(
    action_type="Training",
    created_after="2024-01-01",
    sort_by="CreationTime"
)

Association

Create relationships between artifacts and actions.

class Association:
    """
    Lineage association between artifacts and actions.

    Parameters:
        source_arn: str - Source ARN (artifact or action) (required)
        destination_arn: str - Destination ARN (artifact or action) (required)
        association_type: str - Association type (required)
            - "ContributedTo": Source contributed to destination
            - "AssociatedWith": Generic association
            - "DerivedFrom": Destination derived from source
            - "Produced": Source produced destination
            - "SameAs": Source and destination are same
        sagemaker_session: Optional[Session] - SageMaker session

    Methods:
        create(source_arn, destination_arn, association_type, sagemaker_session=None) -> Association
            Create association.
            
            Parameters:
                source_arn: str - Source ARN (required)
                destination_arn: str - Destination ARN (required)
                association_type: str - Type (required)
                sagemaker_session: Optional[Session] - Session
            
            Returns:
                Association: Created association
            
            Raises:
                ValueError: Invalid association_type
                ClientError: If entities don't exist
        
        list(source_arn=None, destination_arn=None, association_type=None, 
             created_after=None, created_before=None, sort_by="CreationTime", 
             sort_order="Descending", max_results=100, sagemaker_session=None) -> List[Association]
            List associations.
            
            Returns:
                List[Association]: Filtered associations
        
        delete(source_arn, destination_arn, sagemaker_session=None) -> None
            Delete specific association.
            
            Parameters:
                source_arn: str - Source ARN (required)
                destination_arn: str - Destination ARN (required)
                sagemaker_session: Optional[Session] - Session

    Association Types:
        ContributedTo: Source contributed to destination
            - Example: Dataset contributed to Training
        AssociatedWith: Generic association
            - Example: Model associated with Experiment
        DerivedFrom: Destination derived from source
            - Example: Model v2 derived from Model v1
        Produced: Source produced destination
            - Example: Training produced Model
        SameAs: Source and destination are same entity
            - Example: Link different representations

    Attributes:
        source_arn: str - Source ARN
        destination_arn: str - Destination ARN
        association_type: str - Association type
    
    Notes:
        - Build directed graph of ML workflow
        - Query lineage upstream (what produced this) or downstream (what this produced)
        - Multiple associations per entity allowed
        - Deletion doesn't affect entities, only relationship
    """

Usage:

from sagemaker.core.lineage import Association

# Dataset contributed to training
Association.create(
    source_arn=dataset_artifact.artifact_arn,
    destination_arn=training_action.action_arn,
    association_type="ContributedTo"
)

# Training produced model
model_artifact = Artifact.create(
    artifact_name="churn-model-v1",
    artifact_type="Model",
    source={"source_uri": "s3://bucket/model.tar.gz"}
)

Association.create(
    source_arn=training_action.action_arn,
    destination_arn=model_artifact.artifact_arn,
    association_type="Produced"
)

# Model derived from previous version
previous_model = Artifact.load(previous_model_arn)
Association.create(
    source_arn=previous_model.artifact_arn,
    destination_arn=model_artifact.artifact_arn,
    association_type="DerivedFrom"
)

# List associations for artifact
associations = Association.list(
    source_arn=dataset_artifact.artifact_arn,
    association_type="ContributedTo"
)

print(f"Dataset used in {len(associations)} training jobs")

# Delete association
Association.delete(
    source_arn=dataset_artifact.artifact_arn,
    destination_arn=training_action.action_arn
)

Context

Group related artifacts and actions into contexts.

class Context:
    """
    Lineage context for grouping related entities.

    Parameters:
        context_name: Optional[str] - Context name
            - Auto-generated if not provided
        context_type: str - Context type (required)
            - Examples: "Experiment", "Pipeline", "Project", "Endpoint"
        source: Optional[Dict] - Source information
        properties: Optional[Dict[str, str]] - Context properties
        tags: Optional[List[Tag]] - Resource tags
        sagemaker_session: Optional[Session] - SageMaker session

    Methods:
        create(context_name=None, context_type, source=None, properties=None, 
               tags=None, sagemaker_session=None) -> Context
            Create new context.
            
            Returns:
                Context: Created context
        
        load(context_arn, sagemaker_session=None) -> Context
            Load existing context.
            
            Returns:
                Context: Loaded context
        
        list(context_type=None, created_after=None, created_before=None, 
             sort_by="CreationTime", sort_order="Descending", max_results=100, 
             sagemaker_session=None) -> List[Context]
            List contexts.
            
            Returns:
                List[Context]: Filtered contexts
        
        save() -> None
            Save context changes.
        
        delete() -> None
            Delete context (associations preserved).
        
        add_artifact(artifact_arn, association_type="AssociatedWith") -> None
            Add artifact to context.
            
            Parameters:
                artifact_arn: str - Artifact ARN (required)
                association_type: str - Association type (default: "AssociatedWith")
        
        add_action(action_arn, association_type="AssociatedWith") -> None
            Add action to context.
            
            Parameters:
                action_arn: str - Action ARN (required)
                association_type: str - Association type (default: "AssociatedWith")

    Attributes:
        context_arn: str - Context ARN
        context_name: str - Context name
        context_type: str - Context type
        properties: Dict[str, str] - Context properties (mutable)
        source: Optional[Dict] - Source information
    
    Notes:
        - Group related lineage entities
        - Query all entities in context
        - Useful for project/pipeline organization
        - Deleting context doesn't delete entities
    """

Usage:

from sagemaker.core.lineage import Context

# Create pipeline context
pipeline_context = Context.create(
    context_name="customer-churn-pipeline-v1",
    context_type="Pipeline",
    properties={
        "pipeline_version": "v1.0",
        "environment": "production",
        "owner": "data-science-team",
        "schedule": "daily"
    },
    tags=[{"Key": "Project", "Value": "CustomerChurn"}]
)

# Add all pipeline artifacts and actions
pipeline_context.add_artifact(raw_dataset_artifact.artifact_arn, "AssociatedWith")
pipeline_context.add_action(preprocess_action.action_arn, "AssociatedWith")
pipeline_context.add_artifact(processed_dataset_artifact.artifact_arn, "AssociatedWith")
pipeline_context.add_action(training_action.action_arn, "AssociatedWith")
pipeline_context.add_artifact(model_artifact.artifact_arn, "AssociatedWith")
pipeline_context.add_action(deployment_action.action_arn, "AssociatedWith")

print(f"Pipeline context created: {pipeline_context.context_arn}")

# List all pipeline contexts
pipeline_contexts = Context.list(
    context_type="Pipeline",
    sort_by="CreationTime",
    sort_order="Descending"
)

print("Recent pipelines:")
for ctx in pipeline_contexts[:5]:
    print(f"  {ctx.context_name}: {ctx.properties.get('environment')}")

Querying Lineage

LineageQuery

class LineageQuery:
    """
    Query lineage graph.

    Parameters:
        start_arns: List[str] - Starting ARNs for query (required)
            - Artifact ARNs, Action ARNs, or Context ARNs
        direction: LineageQueryDirectionEnum - Query direction (required)
            - ASCENDANTS: Query upstream (what led to this)
            - DESCENDANTS: Query downstream (what this led to)
            - BOTH: Query in both directions
        include_edges: bool - Include edges in result (default: True)
        filters: Optional[LineageFilter] - Query filters
        max_depth: Optional[int] - Maximum traversal depth (default: 10)
            - Range: 1-100
        sagemaker_session: Optional[Session] - SageMaker session

    Methods:
        query() -> Dict
            Execute query and return lineage graph.
            
            Returns:
                Dict: Lineage graph with Vertices and Edges
                    Structure:
                    {
                        "Vertices": [
                            {
                                "Arn": "...",
                                "Type": "Artifact" | "Action" | "Context",
                                "Properties": {...}
                            },
                            ...
                        ],
                        "Edges": [
                            {
                                "SourceArn": "...",
                                "DestinationArn": "...",
                                "AssociationType": "..."
                            },
                            ...
                        ]
                    }
            
            Raises:
                ValueError: Invalid start_arns or direction
                ClientError: Query execution errors

    Notes:
        - Returns complete subgraph up to max_depth
        - Vertices are nodes (artifacts, actions, contexts)
        - Edges are associations between nodes
        - Use filters to focus query
        - Large graphs can be slow (use max_depth)
    """

Usage:

from sagemaker.core.lineage import LineageQuery, LineageQueryDirectionEnum

# Query downstream lineage (what this artifact produced)
query = LineageQuery(
    start_arns=[dataset_artifact.artifact_arn],
    direction=LineageQueryDirectionEnum.DESCENDANTS,
    include_edges=True,
    max_depth=10
)

result = query.query()

print(f"Lineage graph:")
print(f"  Vertices: {len(result['Vertices'])}")
print(f"  Edges: {len(result['Edges'])}")

# Analyze results
for vertex in result['Vertices']:
    print(f"\n{vertex['Type']}: {vertex.get('Properties', {}).get('artifact_name') or vertex.get('Properties', {}).get('action_name')}")

# Query upstream lineage (what produced this artifact)
upstream_query = LineageQuery(
    start_arns=[model_artifact.artifact_arn],
    direction=LineageQueryDirectionEnum.ASCENDANTS,
    include_edges=True,
    max_depth=10
)

upstream = upstream_query.query()

print(f"\nModel trained from:")
for vertex in upstream['Vertices']:
    if vertex['Type'] == 'Artifact' and 'Dataset' in vertex.get('Properties', {}).get('artifact_type', ''):
        print(f"  - Dataset: {vertex['Properties']['artifact_name']}")

# Query in both directions (complete lineage)
full_query = LineageQuery(
    start_arns=[training_action.action_arn],
    direction=LineageQueryDirectionEnum.BOTH,
    include_edges=True
)

full_lineage = full_query.query()

LineageFilter

class LineageFilter:
    """
    Filter for lineage queries.

    Parameters:
        entities: Optional[List[LineageEntityEnum]] - Entity types to include
            - [LineageEntityEnum.ARTIFACT, LineageEntityEnum.ACTION]
        sources: Optional[List[Dict]] - Source filters
        created_before: Optional[datetime] - Created before timestamp
        created_after: Optional[datetime] - Created after timestamp
        modified_before: Optional[datetime] - Modified before timestamp
        modified_after: Optional[datetime] - Modified after timestamp
        properties: Optional[Dict[str, str]] - Property filters
            - Match entities with specific properties

    Usage:
        Filter lineage query results to focus on relevant entities.
    
    Notes:
        - All filters combined with AND logic
        - Empty filter = no filtering
        - Property filters match exact values
    """

LineageEntityEnum

class LineageEntityEnum(Enum):
    """
    Lineage entity types.

    Values:
        ARTIFACT = "Artifact" - Artifact entity
        ACTION = "Action" - Action entity
        CONTEXT = "Context" - Context entity
        ASSOCIATION = "Association" - Association entity

    Usage:
        Specify entity types in LineageFilter.
    """

LineageQueryDirectionEnum

class LineageQueryDirectionEnum(Enum):
    """
    Query direction enumeration.

    Values:
        ASCENDANTS = "Ascendants"
            Query upstream (what led to this)
            Example: Find all datasets used to train a model
        
        DESCENDANTS = "Descendants"
            Query downstream (what this led to)
            Example: Find all models trained from a dataset
        
        BOTH = "Both"
            Query in both directions
            Example: Complete workflow including upstream and downstream

    Usage:
        Specify direction in LineageQuery.
    
    Notes:
        - ASCENDANTS: trace back to data sources
        - DESCENDANTS: trace forward to deployments
        - BOTH: complete lineage graph
    """

Visualization

LineageTableVisualizer

class LineageTableVisualizer:
    """
    Visualize lineage as table.

    Methods:
        show(lineage_graph) -> None
            Display lineage in table format.
            
            Parameters:
                lineage_graph: Dict - Lineage graph from query()

    Usage:
        Render lineage query results as formatted table for analysis.
    
    Notes:
        - Text-based table visualization
        - Shows entities and relationships
        - Use in Jupyter notebooks
    """

Usage:

from sagemaker.core.lineage import LineageTableVisualizer

# Query lineage
query = LineageQuery(
    start_arns=[model_artifact.artifact_arn],
    direction=LineageQueryDirectionEnum.BOTH
)

graph = query.query()

# Visualize
visualizer = LineageTableVisualizer()
visualizer.show(graph)

# Output example:
# +----------------+----------------------+-------------------+
# | Type           | Name                 | ARN               |
# +----------------+----------------------+-------------------+
# | Artifact       | training-dataset-v1  | arn:aws:...       |
# | Action         | train-model-v1       | arn:aws:...       |
# | Artifact       | trained-model-v1     | arn:aws:...       |
# +----------------+----------------------+-------------------+

Advanced Usage

End-to-End Pipeline Tracking

from sagemaker.core.lineage import Artifact, Action, Association, Context

# Create pipeline context
pipeline_context = Context.create(
    context_name="ml-pipeline-2024-01-15",
    context_type="Pipeline",
    properties={
        "version": "v2.0",
        "environment": "production"
    }
)

# Track entire workflow
# 1. Raw data
raw_data = Artifact.create(
    artifact_name="raw-customer-data-2024-01-15",
    artifact_type="Dataset",
    source={"source_uri": "s3://bucket/raw/2024-01-15/"},
    properties={
        "source": "CRM_system",
        "record_count": "50000",
        "date": "2024-01-15"
    }
)

# 2. Preprocessing action
preprocess = Action.create(
    action_name="preprocess-2024-01-15",
    action_type="Processing",
    properties={
        "processor": "spark",
        "transformations": "cleaning,feature_engineering,splitting"
    },
    status="Completed"
)

Association.create(raw_data.artifact_arn, preprocess.action_arn, "ContributedTo")

# 3. Processed datasets
processed_data = Artifact.create(
    artifact_name="processed-data-2024-01-15",
    artifact_type="Dataset",
    source={"source_uri": "s3://bucket/processed/2024-01-15/"},
    properties={
        "train_samples": "35000",
        "val_samples": "7500",
        "test_samples": "7500"
    }
)

Association.create(preprocess.action_arn, processed_data.artifact_arn, "Produced")

# 4. Training action
train = Action.create(
    action_name="train-2024-01-15",
    action_type="Training",
    properties={
        "algorithm": "xgboost",
        "instance_type": "ml.m5.2xlarge",
        "training_time_seconds": "1200"
    },
    status="Completed"
)

Association.create(processed_data.artifact_arn, train.action_arn, "ContributedTo")

# 5. Model artifact
model = Artifact.create(
    artifact_name="churn-model-2024-01-15",
    artifact_type="Model",
    source={"source_uri": "s3://bucket/models/2024-01-15/model.tar.gz"},
    properties={
        "accuracy": "0.94",
        "f1_score": "0.92",
        "framework": "xgboost",
        "version": "1.7.3"
    }
)

Association.create(train.action_arn, model.artifact_arn, "Produced")

# 6. Add all to pipeline context
for arn in [
    raw_data.artifact_arn,
    preprocess.action_arn,
    processed_data.artifact_arn,
    train.action_arn,
    model.artifact_arn
]:
    if "artifact" in arn:
        pipeline_context.add_artifact(arn, "AssociatedWith")
    else:
        pipeline_context.add_action(arn, "AssociatedWith")

print(f"Complete pipeline tracked in context: {pipeline_context.context_arn}")

Model Governance

# Track model versions with approval workflow
model_v1 = Artifact.create(
    artifact_name="customer-churn-model-v1.0",
    artifact_type="Model",
    source={"source_uri": "s3://models/churn-v1.0.tar.gz"},
    properties={
        "accuracy": "0.85",
        "approved": "true",
        "approver": "john.doe@company.com",
        "approval_date": "2024-01-10",
        "production_deployment": "endpoint-prod-v1"
    }
)

# New version with improvements
model_v2 = Artifact.create(
    artifact_name="customer-churn-model-v2.0",
    artifact_type="Model",
    source={"source_uri": "s3://models/churn-v2.0.tar.gz"},
    properties={
        "accuracy": "0.89",
        "f1_score": "0.87",
        "approved": "pending",
        "reviewer": "jane.smith@company.com",
        "improvements": "better_feature_engineering,larger_dataset"
    }
)

# Link versions
Association.create(
    model_v1.artifact_arn,
    model_v2.artifact_arn,
    "DerivedFrom"
)

# Query all model versions
models = Artifact.list(
    artifact_type="Model",
    sort_by="CreationTime",
    sort_order="Descending"
)

print("Model version history:")
for model in models:
    approval = model.properties.get("approved", "unknown")
    accuracy = model.properties.get("accuracy", "N/A")
    print(f"  {model.artifact_name}: accuracy={accuracy}, approved={approval}")

Data Provenance

# Query complete data lineage for production model
prod_model_arn = "arn:aws:sagemaker:us-west-2:123:artifact/production-model"

# Find all data sources
provenance_query = LineageQuery(
    start_arns=[prod_model_arn],
    direction=LineageQueryDirectionEnum.ASCENDANTS,
    max_depth=20
)

lineage = provenance_query.query()

# Extract datasets
datasets = []
for vertex in lineage['Vertices']:
    if vertex.get('Type') == 'Artifact':
        artifact_type = vertex.get('Properties', {}).get('artifact_type', '')
        if 'Dataset' in artifact_type:
            datasets.append({
                'name': vertex['Properties'].get('artifact_name'),
                'uri': vertex['Properties'].get('source_uri'),
                'samples': vertex['Properties'].get('num_samples')
            })

print(f"Production model trained from {len(datasets)} datasets:")
for ds in datasets:
    print(f"  - {ds['name']}: {ds['samples']} samples")
    print(f"    Source: {ds['uri']}")

Compliance Reporting

from datetime import datetime, timedelta

# Find all models created in last 30 days
thirty_days_ago = datetime.now() - timedelta(days=30)

recent_models = Artifact.list(
    artifact_type="Model",
    created_after=thirty_days_ago.isoformat(),
    sort_by="CreationTime",
    sort_order="Descending"
)

# Generate compliance report
print("=== Model Compliance Report ===\n")

for model in recent_models:
    print(f"Model: {model.artifact_name}")
    print(f"  Created: {model.creation_time}")
    print(f"  Approval: {model.properties.get('approved', 'unknown')}")
    
    # Query upstream lineage
    query = LineageQuery(
        start_arns=[model.artifact_arn],
        direction=LineageQueryDirectionEnum.ASCENDANTS
    )
    lineage = query.query()
    
    # Count entities
    datasets = sum(1 for v in lineage['Vertices'] 
                  if v['Type'] == 'Artifact' and 'Dataset' in v.get('Properties', {}).get('artifact_type', ''))
    training_jobs = sum(1 for v in lineage['Vertices']
                       if v['Type'] == 'Action' and v.get('Properties', {}).get('action_type') == 'Training')
    
    print(f"  Data sources: {datasets}")
    print(f"  Training jobs: {training_jobs}")
    
    # Check for required approvals
    if model.properties.get('approved') != 'true':
        print(f"  WARNING: Model not approved for production!")
    
    print()

Reproduce Training from Lineage

# Given a model, reproduce its training
model_arn = "arn:aws:sagemaker:us-west-2:123:artifact/model-abc"

# Query complete upstream lineage
query = LineageQuery(
    start_arns=[model_arn],
    direction=LineageQueryDirectionEnum.ASCENDANTS,
    include_edges=True
)

lineage = query.query()

# Extract training details
training_info = {}

for vertex in lineage['Vertices']:
    if vertex['Type'] == 'Action' and vertex.get('Properties', {}).get('action_type') == 'Training':
        # Extract hyperparameters
        properties = vertex['Properties']
        training_info = {
            'algorithm': properties.get('algorithm'),
            'hyperparameters': json.loads(properties.get('hyperparameters', '{}')),
            'instance_type': properties.get('instance_type'),
            'training_time': properties.get('training_time_seconds')
        }
    
    elif vertex['Type'] == 'Artifact' and 'Dataset' in vertex.get('Properties', {}).get('artifact_type', ''):
        # Find training dataset
        training_info['dataset_uri'] = vertex['Properties'].get('source_uri')

print("Training configuration for reproduction:")
print(json.dumps(training_info, indent=2))

# Use to reproduce training
trainer = ModelTrainer(
    training_image=training_info['algorithm'],
    compute=Compute(instance_type=training_info['instance_type']),
    hyperparameters=training_info['hyperparameters']
)

train_data = InputData(
    channel_name="training",
    data_source=training_info['dataset_uri']
)

# Reproduce training
trainer.train(input_data_config=[train_data])

Validation and Constraints

Lineage Constraints

  • Maximum query depth: 100
  • Maximum start ARNs: 10 per query
  • Maximum vertices in result: 10,000
  • Maximum edges in result: 100,000
  • Query timeout: 60 seconds

Entity Constraints

  • Artifact name: 1-120 characters
  • Maximum properties: 30 per entity
  • Property key length: 1-256 characters
  • Property value length: 1-256 characters
  • Maximum tags: 50 per entity
  • Maximum associations per entity: 1000

Common Error Scenarios

  1. Circular Association:

    • Cause: Creating association that forms cycle
    • Solution: Lineage is DAG, no cycles allowed
  2. Entity Not Found:

    • Cause: ARN doesn't exist or typo
    • Solution: Verify ARN, check entity exists
  3. Query Timeout:

    • Cause: Lineage graph too large
    • Solution: Reduce max_depth, use filters
  4. Too Many Results:

    • Cause: Query returns >10K vertices
    • Solution: Use filters, reduce max_depth, query smaller subgraphs
  5. Property Update Failed:

    • Cause: Invalid property value or missing save()
    • Solution: Ensure properties valid, call save() after updates
  6. Association Already Exists:

    • Cause: Duplicate association creation
    • Solution: Check existing associations before creating