or run

tessl search
Log in

Version

Files

docs

ai-registry.mdclarify.mddata-io.mddebugger.mdevaluation.mdexperiments.mdexplainer-config.mdindex.mdjumpstart.mdlineage.mdmlops.mdmonitoring.mdprocessing.mdremote-functions.mdresources.mds3-utilities.mdserving.mdtraining.mdworkflow-primitives.md
tile.json

mlops.mddocs/

MLOps & Pipelines

Pipeline orchestration and workflow management for building complex ML workflows with conditional execution, parallelism, and retry policies.

Capabilities

Pipeline

Main workflow orchestration class for defining and executing ML pipelines.

class Pipeline:
    """
    SageMaker Pipeline for workflow orchestration.

    Parameters:
        name: str - Pipeline name (required)
            - 1-256 characters
            - Alphanumeric and hyphens only
        
        steps: List[Step] - Pipeline steps (required)
            - List of step objects (TrainingStep, ProcessingStep, etc.)
            - Dependencies determined by step properties references
            - Maximum 50 steps per pipeline
        
        parameters: Optional[List[Parameter]] - Pipeline parameters
            - Runtime parameters (ParameterString, ParameterInteger, etc.)
            - Maximum 200 parameters
        
        sagemaker_session: Optional[Session] - SageMaker session
            - Defaults to new session if not provided
        
        pipeline_experiment_config: Optional[PipelineExperimentConfig] - Experiment configuration
            - Links executions to experiments for tracking
        
        parallelism_config: Optional[ParallelismConfiguration] - Parallelism configuration
            - Controls maximum concurrent step executions
        
        tags: Optional[List[Tag]] - Resource tags
            - Applied to pipeline and all executions

    Methods:
        create(role_arn, description=None, tags=None) -> Dict
            Create pipeline in SageMaker.
            
            Parameters:
                role_arn: str - IAM role ARN (required)
                description: Optional[str] - Pipeline description (max 3072 chars)
                tags: Optional[List[Tag]] - Additional tags
            
            Returns:
                Dict: Response with PipelineArn
            
            Raises:
                ValueError: Invalid pipeline definition or duplicate steps
                ClientError: AWS API errors
        
        update(role_arn, description=None) -> Dict
            Update existing pipeline definition.
            
            Parameters:
                role_arn: str - IAM role ARN
                description: Optional[str] - Updated description
            
            Returns:
                Dict: Response with PipelineArn
            
            Raises:
                ClientError: If pipeline doesn't exist or validation fails
        
        upsert(role_arn, description=None, tags=None) -> Dict
            Create or update pipeline.
            
            Parameters:
                role_arn: str - IAM role ARN (required)
                description: Optional[str] - Pipeline description
                tags: Optional[List[Tag]] - Tags (only for create)
            
            Returns:
                Dict: Response with PipelineArn
        
        describe() -> Dict
            Get pipeline description.
            
            Returns:
                Dict: Complete pipeline definition and metadata
        
        delete() -> None
            Delete pipeline (stops all running executions).
            
            Raises:
                ClientError: If pipeline has dependencies or deletion fails
        
        start(parameters=None, execution_display_name=None) -> PipelineExecution
            Start pipeline execution.
            
            Parameters:
                parameters: Optional[Dict[str, Any]] - Parameter values
                execution_display_name: Optional[str] - Display name for execution
            
            Returns:
                PipelineExecution: Started execution resource
            
            Raises:
                ValueError: Invalid parameter values
                ClientError: Execution start errors
        
        list_executions(sort_by="CreationTime", sort_order="Descending", max_results=10) -> List[Dict]
            List pipeline executions.
            
            Parameters:
                sort_by: str - Sort field (CreationTime or PipelineExecutionArn)
                sort_order: str - Ascending or Descending
                max_results: int - Maximum results (1-100)
            
            Returns:
                List[Dict]: Execution summaries

    Class Methods:
        attach(pipeline_name, sagemaker_session=None) -> Pipeline
            Attach to existing pipeline.
            
            Parameters:
                pipeline_name: str - Existing pipeline name
                sagemaker_session: Optional[Session] - SageMaker session
            
            Returns:
                Pipeline: Attached pipeline object
            
            Raises:
                ClientError: If pipeline doesn't exist
    
    Raises:
        ValueError: Invalid pipeline structure, circular dependencies, duplicate step names
        ClientError: AWS API errors
    
    Notes:
        - Steps execute in dependency order (DAG)
        - Circular dependencies not allowed
        - Step names must be unique
        - Maximum execution time: 45 days
        - Maximum concurrent executions: 100 (can request increase)
    """

Usage:

from sagemaker.mlops.workflow import Pipeline, TrainingStep, ProcessingStep
from sagemaker.train import ModelTrainer
from sagemaker.core import Processor

# Define processing step
process_step = ProcessingStep(
    name="PreprocessData",
    processor=processor,
    inputs=[
        ProcessingInput(
            source="s3://bucket/raw",
            destination="/opt/ml/processing/input"
        )
    ],
    outputs=[
        ProcessingOutput(
            output_name="train",
            source="/opt/ml/processing/train",
            destination="s3://bucket/processed/train"
        ),
        ProcessingOutput(
            output_name="val",
            source="/opt/ml/processing/val",
            destination="s3://bucket/processed/val"
        )
    ]
)

# Define training step with dependency on processing
train_step = TrainingStep(
    name="TrainModel",
    estimator=trainer,
    inputs={
        "training": process_step.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
        "validation": process_step.properties.ProcessingOutputConfig.Outputs["val"].S3Output.S3Uri
    }
)

# Create pipeline
pipeline = Pipeline(
    name="ml-pipeline",
    steps=[process_step, train_step],  # Order doesn't matter (dependency-based)
    sagemaker_session=session,
    tags=[
        {"Key": "Project", "Value": "CustomerChurn"},
        {"Key": "Environment", "Value": "Production"}
    ]
)

# Create in SageMaker
try:
    response = pipeline.create(
        role_arn="arn:aws:iam::123456789012:role/SageMakerPipelineRole",
        description="Customer churn prediction pipeline"
    )
    print(f"Pipeline created: {response['PipelineArn']}")
    
    # Execute pipeline
    execution = pipeline.start()
    print(f"Execution started: {execution.arn}")
    
    # Wait for completion
    execution.wait(poll=30)
    print(f"Status: {execution.pipeline_execution_status}")
    
except ValueError as e:
    print(f"Pipeline validation error: {e}")
except ClientError as e:
    print(f"AWS API error: {e}")

Parameterized Pipeline:

from sagemaker.core.workflow import ParameterString, ParameterInteger, ParameterFloat

# Define parameters
instance_type = ParameterString(
    name="TrainingInstanceType",
    default_value="ml.m5.xlarge",
    enum_values=["ml.m5.xlarge", "ml.m5.2xlarge", "ml.p3.2xlarge"]
)

instance_count = ParameterInteger(
    name="InstanceCount",
    default_value=1,
    min_value=1,
    max_value=10
)

learning_rate = ParameterFloat(
    name="LearningRate",
    default_value=0.001,
    min_value=0.0001,
    max_value=0.1
)

data_path = ParameterString(
    name="InputDataPath",
    default_value="s3://bucket/default-data"
)

# Use parameters in steps
trainer = ModelTrainer(
    compute=Compute(
        instance_type=instance_type,  # Parameter reference
        instance_count=instance_count
    ),
    hyperparameters={
        "learning_rate": learning_rate  # Parameter reference
    }
)

train_step = TrainingStep(
    name="Train",
    estimator=trainer,
    inputs={"training": data_path}
)

# Create parameterized pipeline
pipeline = Pipeline(
    name="parameterized-pipeline",
    parameters=[instance_type, instance_count, learning_rate, data_path],
    steps=[train_step]
)

pipeline.create(role_arn=role)

# Execute with custom parameters
execution = pipeline.start(
    parameters={
        "TrainingInstanceType": "ml.p3.2xlarge",
        "InstanceCount": 2,
        "LearningRate": 0.01,
        "InputDataPath": "s3://bucket/experiment-data"
    }
)

PipelineGraph

class PipelineGraph:
    """
    Pipeline Directed Acyclic Graph (DAG) representation.

    Helper class for analyzing pipeline structure and dependencies.

    Attributes:
        steps: Sequence[Step] - Steps representing nodes in the DAG
        step_map: Dict[str, Step] - Mapping of step names to step instances
        adjacency_list: Dict[str, List[str]] - DAG adjacency list representation

    Methods:
        is_cyclic() -> bool
            Check if graph contains cycles.
            
            Returns:
                bool: True if cycles detected, False otherwise
        
        get_dependencies(step_name) -> List[str]
            Get dependencies for a step.
            
            Parameters:
                step_name: str - Name of step
            
            Returns:
                List[str]: List of step names this step depends on
        
        topological_sort() -> List[Step]
            Get topologically sorted steps.
            
            Returns:
                List[Step]: Steps in execution order
            
            Raises:
                ValueError: If graph contains cycles

    Class Methods:
        from_pipeline(pipeline) -> PipelineGraph
            Create graph from Pipeline.
            
            Parameters:
                pipeline: Pipeline - Pipeline to analyze
            
            Returns:
                PipelineGraph: Graph representation

    Notes:
        - Use to validate pipeline structure before deployment
        - Detect circular dependencies
        - Understand execution order
        - Identify parallel execution opportunities
    """

Usage:

from sagemaker.mlops.workflow import Pipeline, PipelineGraph

# Create pipeline
pipeline = Pipeline(name="my-pipeline", steps=[step1, step2, step3, step4])

# Analyze pipeline graph
graph = PipelineGraph.from_pipeline(pipeline)

# Check for cycles (invalid)
if graph.is_cyclic():
    raise ValueError("Pipeline contains cycles!")

# Get execution order
sorted_steps = graph.topological_sort()
print("Execution order:")
for i, step in enumerate(sorted_steps, 1):
    print(f"  {i}. {step.name}")

# Analyze dependencies
for step in graph.steps:
    deps = graph.get_dependencies(step.name)
    if deps:
        print(f"{step.name} depends on: {deps}")
    else:
        print(f"{step.name} has no dependencies (can run first)")

StepCollection

class StepCollection:
    """
    Wrapper for grouping multiple pipeline steps.

    Allows treating multiple steps as a single unit with shared dependencies.

    Attributes:
        name: str - Name of the step collection (required)
        steps: List[Step] - List of steps in the collection (required)
        depends_on: List[Union[str, Step, StepCollection, StepOutput]] - Dependencies
            - Step names (strings)
            - Step objects
            - Other StepCollections
            - StepOutput objects

    Methods:
        request_dicts() -> List[RequestType]
            Get request structures for all steps.
            
            Returns:
                List[RequestType]: API request dicts for all steps in collection

    Properties:
        properties: Properties
            Access properties from steps in collection.

    Notes:
        - All steps in collection share same dependencies
        - Steps within collection can depend on each other
        - Useful for organizing related preprocessing or evaluation steps
    """

Usage:

from sagemaker.mlops.workflow import StepCollection, ProcessingStep

# Group related preprocessing steps
preprocessing_steps = StepCollection(
    name="DataPreprocessing",
    steps=[
        ProcessingStep(
            name="CleanData",
            processor=clean_processor,
            inputs=[ProcessingInput(source="s3://bucket/raw")],
            outputs=[ProcessingOutput(
                output_name="cleaned",
                destination="s3://bucket/cleaned"
            )]
        ),
        ProcessingStep(
            name="FeatureEngineering",
            processor=feature_processor,
            inputs=[ProcessingInput(source="s3://bucket/cleaned")],
            outputs=[ProcessingOutput(
                output_name="features",
                destination="s3://bucket/features"
            )]
        ),
        ProcessingStep(
            name="SplitData",
            processor=split_processor,
            inputs=[ProcessingInput(source="s3://bucket/features")],
            outputs=[
                ProcessingOutput(output_name="train", destination="s3://bucket/train"),
                ProcessingOutput(output_name="val", destination="s3://bucket/val")
            ]
        )
    ]
)

# Training depends on entire preprocessing collection
training_step = TrainingStep(
    name="Train",
    estimator=trainer,
    depends_on=[preprocessing_steps]  # Runs after all preprocessing completes
)

pipeline = Pipeline(
    name="pipeline-with-collection",
    steps=[preprocessing_steps, training_step]
)

Step Types

TrainingStep

class TrainingStep:
    """
    Training step for pipelines.

    Parameters:
        name: str - Step name (required, unique within pipeline)
        estimator: ModelTrainer - Model trainer instance (required)
        inputs: Optional[Union[TrainingInput, Dict]] - Training inputs
            - Dict: {"channel_name": s3_uri_or_property}
            - TrainingInput: Full input specification
        cache_config: Optional[CacheConfig] - Caching configuration
        retry_policies: Optional[List[RetryPolicy]] - Retry policies
        depends_on: Optional[List[Union[str, Step]]] - Dependencies
            - List of step names or step objects

    Properties:
        properties: Properties - Access to step outputs
            - properties.TrainingJobName: Training job name
            - properties.ModelArtifacts.S3ModelArtifacts: Model S3 URI
            - properties.FinalMetricDataList[metric_name].Value: Metric values

    Notes:
        - Automatically depends on steps referenced in inputs
        - Can reference properties from previous steps
        - Supports caching to skip if inputs unchanged
    """

ProcessingStep

class ProcessingStep:
    """
    Processing step for data processing.

    Parameters:
        name: str - Step name (required, unique)
        processor: Processor - Processor instance (required)
        inputs: Optional[List[ProcessingInput]] - Processing inputs
        outputs: Optional[List[ProcessingOutput]] - Processing outputs
        job_arguments: Optional[List[str]] - Command-line arguments
        code: Optional[str] - Processing script path
        cache_config: Optional[CacheConfig] - Caching configuration
        retry_policies: Optional[List[RetryPolicy]] - Retry policies
        depends_on: Optional[List[Union[str, Step]]] - Dependencies
        property_files: Optional[List[PropertyFile]] - Property files for JsonGet

    Properties:
        properties: Properties - Access to step outputs
            - properties.ProcessingJobName: Job name
            - properties.ProcessingOutputConfig.Outputs[output_name].S3Output.S3Uri: Output URIs
            - properties.ProcessingJobStatus: Job status

    Notes:
        - Use for data preprocessing, feature engineering, evaluation
        - Can output property files for conditional logic
        - Supports multiple inputs and outputs
        - Arguments passed to container as command-line args
    """

TransformStep

class TransformStep:
    """
    Batch transform step.

    Parameters:
        name: str - Step name (required, unique)
        transformer: Transformer - Transformer instance (required)
        inputs: TransformInput - Transform input configuration (required)
        cache_config: Optional[CacheConfig] - Caching configuration
        retry_policies: Optional[List[RetryPolicy]] - Retry policies
        depends_on: Optional[List[Union[str, Step]]] - Dependencies

    Properties:
        properties: Properties - Access to step outputs
            - properties.TransformJobName: Job name
            - properties.TransformOutput.S3OutputPath: Output URI

    Notes:
        - Use for batch inference on large datasets
        - More cost-effective than real-time endpoints for batch
        - Can split large files across multiple instances
    """

TuningStep

class TuningStep:
    """
    Hyperparameter tuning step.

    Parameters:
        name: str - Step name (required, unique)
        tuner: HyperparameterTuner - Tuner instance (required)
        inputs: Optional[Union[TrainingInput, Dict]] - Training inputs
        cache_config: Optional[CacheConfig] - Caching configuration
        retry_policies: Optional[List[RetryPolicy]] - Retry policies
        depends_on: Optional[List[Union[str, Step]]] - Dependencies

    Properties:
        properties: Properties - Access to tuning outputs
            - properties.HyperParameterTuningJobName: Tuning job name
            - properties.BestTrainingJob.TrainingJobName: Best job name
            - properties.BestTrainingJob.ModelArtifacts.S3ModelArtifacts: Best model URI

    Notes:
        - Can reference best model from tuning in subsequent steps
        - Supports caching based on input data and hyperparameter ranges
    """

ConditionStep

class ConditionStep:
    """
    Conditional execution step.

    Parameters:
        name: str - Step name (required, unique)
        conditions: List[Condition] - Conditions to evaluate (required)
            - All conditions must be True for if_steps
            - Supports: ConditionEquals, ConditionGreaterThan, etc.
        if_steps: List[Step] - Steps to execute if conditions are True (required)
        else_steps: Optional[List[Step]] - Steps to execute if conditions are False
        depends_on: Optional[List[Union[str, Step]]] - Dependencies

    Notes:
        - All conditions combined with AND logic
        - Use ConditionOr for OR logic
        - Can nest conditional steps
        - Steps in if_steps/else_steps can have their own dependencies
        - Only executed branch contributes to execution time/cost
    """

Conditional Pipeline Example:

from sagemaker.mlops.workflow import Pipeline, TrainingStep, ConditionStep, ModelStep
from sagemaker.core.workflow import ConditionGreaterThan, ParameterFloat

# Define accuracy threshold parameter
accuracy_threshold = ParameterFloat(name="AccuracyThreshold", default_value=0.9)

# Training step
train_step = TrainingStep(name="Train", estimator=trainer)

# Model registration step
register_step = ModelStep(
    name="RegisterModel",
    step_args=builder.register_args(
        model_package_group_name="models",
        approval_status="PendingManualApproval"
    )
)

# Failure step
fail_step = FailStep(
    name="TrainingFailed",
    error_message="Model accuracy below threshold"
)

# Condition based on training accuracy
accuracy_condition = ConditionGreaterThan(
    left=train_step.properties.FinalMetricDataList["validation:accuracy"].Value,
    right=accuracy_threshold
)

# Conditional registration
condition_step = ConditionStep(
    name="CheckAccuracy",
    conditions=[accuracy_condition],
    if_steps=[register_step],  # Register if accuracy good
    else_steps=[fail_step]  # Fail if accuracy low
)

# Create conditional pipeline
pipeline = Pipeline(
    name="conditional-pipeline",
    parameters=[accuracy_threshold],
    steps=[train_step, condition_step]
)

# Execute with custom threshold
execution = pipeline.start(
    parameters={"AccuracyThreshold": 0.95}
)

ModelStep

class ModelStep:
    """
    Model creation step.

    Parameters:
        name: str - Step name (required, unique)
        step_args: Dict - Step arguments from ModelBuilder (required)
            - Obtained from builder.build_args() or builder.register_args()
        depends_on: Optional[List[Union[str, Step]]] - Dependencies

    Properties:
        properties: Properties - Access to model outputs
            - properties.ModelName: Created model name
            - properties.PrimaryContainer.Image: Container image URI

    Notes:
        - Use to create SageMaker Model resource in pipeline
        - Often follows training step
        - Model can be deployed in subsequent steps
    """

LambdaStep

class LambdaStep:
    """
    AWS Lambda function step.

    Parameters:
        name: str - Step name (required, unique)
        lambda_func: Union[Lambda, str] - Lambda function or ARN (required)
            - Lambda object
            - Function ARN: "arn:aws:lambda:region:account:function:name"
        inputs: Dict - Lambda function inputs (required)
            - JSON-serializable dictionary
            - Can include pipeline variables
        outputs: Optional[List[LambdaOutput]] - Lambda outputs
            - Define output variables from Lambda response
        cache_config: Optional[CacheConfig] - Caching configuration
        depends_on: Optional[List[Union[str, Step]]] - Dependencies

    Properties:
        properties: Properties - Access to Lambda outputs
            - properties.OutputName: Defined output values

    Notes:
        - Lambda timeout: maximum 15 minutes
        - Use for custom logic not available in SageMaker
        - Lambda must return JSON-serializable output
        - Lambda execution role needs permissions for SageMaker API calls
    """

Usage:

from sagemaker.mlops.workflow import LambdaStep, LambdaOutput

# Define Lambda step for custom validation
validate_step = LambdaStep(
    name="ValidateData",
    lambda_func="arn:aws:lambda:us-west-2:123:function:validate-data",
    inputs={
        "data_uri": process_step.properties.ProcessingOutputConfig.Outputs["output"].S3Output.S3Uri,
        "schema_uri": "s3://bucket/schema.json"
    },
    outputs=[
        LambdaOutput(output_name="is_valid", output_type="Boolean"),
        LambdaOutput(output_name="error_count", output_type="Integer")
    ]
)

# Use Lambda outputs in condition
is_valid = validate_step.properties.Outputs["is_valid"]
condition = ConditionEquals(left=is_valid, right=True)

condition_step = ConditionStep(
    name="CheckValidation",
    conditions=[condition],
    if_steps=[train_step],
    else_steps=[fail_step]
)

CallbackStep

class CallbackStep:
    """
    Custom callback step for external integrations.

    Parameters:
        name: str - Step name (required, unique)
        sqs_queue_url: str - SQS queue URL for callbacks (required)
            - Format: "https://sqs.region.amazonaws.com/account/queue-name"
        inputs: Dict - Callback inputs (required)
            - Sent to SQS queue
            - JSON-serializable
        outputs: Optional[List[CallbackOutput]] - Callback outputs
            - Define expected outputs from external system
        depends_on: Optional[List[Union[str, Step]]] - Dependencies

    Properties:
        properties: Properties - Access to callback outputs
            - properties.OutputName: Output values from callback response

    Notes:
        - Pipeline pauses until callback received
        - External system must send response to SQS queue
        - Response format: {"token": "...", "output": {...}}
        - Maximum wait time: 7 days
        - Use for manual approval, external validation, etc.
    """

Usage:

from sagemaker.mlops.workflow import CallbackStep, CallbackOutput

# Create SQS queue for callbacks
import boto3
sqs = boto3.client('sqs')
queue_url = sqs.create_queue(QueueName='pipeline-callbacks')['QueueUrl']

# Define callback step for manual approval
approval_step = CallbackStep(
    name="ManualApproval",
    sqs_queue_url=queue_url,
    inputs={
        "model_metrics": train_step.properties.FinalMetricDataList,
        "model_uri": train_step.properties.ModelArtifacts.S3ModelArtifacts
    },
    outputs=[
        CallbackOutput(output_name="approved", output_type="Boolean"),
        CallbackOutput(output_name="comments", output_type="String")
    ]
)

# External system sends approval
# Response format:
# {
#     "token": "<callback_token_from_sqs_message>",
#     "output": {
#         "approved": true,
#         "comments": "Model approved for production"
#     }
# }

# Continue based on approval
condition = ConditionEquals(
    left=approval_step.properties.Outputs["approved"],
    right=True
)

condition_step = ConditionStep(
    name="CheckApproval",
    conditions=[condition],
    if_steps=[deploy_step],
    else_steps=[fail_step]
)

FailStep

class FailStep:
    """
    Explicit failure step for error handling.

    Parameters:
        name: str - Step name (required, unique)
        error_message: Union[str, PipelineVariable] - Error message (required)
            - Static string or pipeline variable
        depends_on: Optional[List[Union[str, Step]]] - Dependencies

    Notes:
        - Explicitly fails pipeline execution with custom message
        - Use in else_steps of conditional logic
        - Execution marked as Failed with provided error message
        - Useful for validation failures or unmet requirements
    """

QualityCheckStep

class QualityCheckStep:
    """
    Model/data quality check step.

    Parameters:
        name: str - Step name (required, unique)
        check_job_config: CheckJobConfig - Check job configuration (required)
        quality_check_config: QualityCheckConfig - Quality configuration (required)
        model_package_group_name: Optional[str] - Model package group for baselines
        skip_check: Optional[bool] - Skip check (default: False)
        register_new_baseline: Optional[bool] - Register new baseline (default: False)
        supplied_baseline_constraints: Optional[str] - S3 URI for existing baseline
        depends_on: Optional[List[Union[str, Step]]] - Dependencies

    Properties:
        properties: Properties - Access to quality check outputs
            - properties.CalculatedBaselineConstraints: S3 URI
            - properties.BaselineUsedForDriftCheckConstraints: S3 URI

    Notes:
        - Validates data or model quality against baseline
        - Can register first run as baseline
        - Subsequent runs check drift from baseline
        - Fails pipeline if quality below threshold
    """

ClarifyCheckStep

class ClarifyCheckStep:
    """
    Bias and explainability check step.

    Parameters:
        name: str - Step name (required, unique)
        clarify_check_config: Dict - Clarify configuration (required)
            - bias_config or explainability_config
        check_job_config: CheckJobConfig - Check job configuration (required)
        skip_check: Optional[bool] - Skip check (default: False)
        register_new_baseline: Optional[bool] - Register new baseline (default: False)
        supplied_baseline_constraints: Optional[str] - S3 URI for baseline
        model_package_group_name: Optional[str] - Model package group
        depends_on: Optional[List[Union[str, Step]]] - Dependencies

    Properties:
        properties: Properties - Access to Clarify check outputs
            - properties.CalculatedBaselineConstraints: Bias/explainability baseline
            - properties.ViolationReport: S3 URI if violations detected

    Notes:
        - Checks model bias or explainability
        - Fails pipeline if bias exceeds threshold
        - Use after training or before deployment
    """

AutoMLStep

class AutoMLStep:
    """
    AutoML training step.

    Parameters:
        name: str - Step name (required, unique)
        step_args: Dict - AutoML step arguments (required)
            - From AutoML.get_step_args()
        depends_on: Optional[List[Union[str, Step]]] - Dependencies

    Properties:
        properties: Properties - Access to AutoML outputs
            - properties.BestCandidate: Best model candidate
            - properties.BestCandidate.ModelInsightsPath: Model insights

    Notes:
        - Automatically finds best model and hyperparameters
        - Longer execution time than manual training
        - Use for baseline models or when expertise limited
    """

EMRStep

class EMRStep:
    """
    Amazon EMR step for big data processing.

    Parameters:
        name: str - Step name (required, unique)
        cluster_id: Union[str, PipelineVariable] - EMR cluster ID (required)
            - Existing cluster ID or pipeline variable
        step_config: Union[EMRStepConfig, Dict] - EMR step configuration (required)
            - Jar, HadoopJarStep, or SparkStep config
        depends_on: Optional[List[Union[str, Step]]] - Dependencies

    Properties:
        properties: Properties - Access to EMR step outputs
            - properties.ClusterId: EMR cluster ID
            - properties.StepId: EMR step ID

    Notes:
        - Requires existing EMR cluster
        - Use for large-scale data processing with Spark/Hadoop
        - EMR cluster must be in same region as pipeline
    """

NotebookJobStep

class NotebookJobStep:
    """
    Notebook execution step.

    Parameters:
        name: str - Step name (required, unique)
        notebook_job_name: str - Notebook job name (required)
        input_notebook: str - S3 URI of input notebook (required)
        image_uri: str - Container image URI (required)
        kernel_name: str - Jupyter kernel name (required)
        role: str - IAM role ARN (required)
        instance_type: str - Instance type (required)
        depends_on: Optional[List[Union[str, Step]]] - Dependencies

    Properties:
        properties: Properties - Access to notebook outputs
            - properties.OutputNotebookLocation: S3 URI of executed notebook

    Notes:
        - Executes Jupyter notebooks in pipeline
        - Useful for exploratory analysis in automated workflows
        - Output notebook contains execution results
    """

MonitorBatchTransformStep

class MonitorBatchTransformStep:
    """
    Batch transform with monitoring.

    Parameters:
        name: str - Step name (required, unique)
        transform_step_name: str - Transform step to monitor (required)
        monitor_configuration: Dict - Monitoring configuration (required)
        check_job_configuration: CheckJobConfig - Check job configuration (required)
        depends_on: Optional[List[Union[str, Step]]] - Dependencies

    Properties:
        properties: Properties - Access to monitoring outputs
            - properties.CalculatedBaselineConstraints: Baseline
            - properties.ViolationReport: Violations if any

    Notes:
        - Monitors transform job outputs for quality/drift
        - Fails if violations detected
        - Use for production batch inference monitoring
    """

Configuration Classes

PipelineExperimentConfig

class PipelineExperimentConfig:
    """
    Experiment configuration for pipelines.

    Fields:
        experiment_name: Union[str, PipelineVariable] - Experiment name (required)
            - Static string or ExecutionVariables.PIPELINE_NAME
        trial_name: Union[str, PipelineVariable] - Trial name (required)
            - Typically ExecutionVariables.PIPELINE_EXECUTION_ID for unique trials

    Notes:
        - Links each pipeline execution to an experiment trial
        - Enables tracking pipeline runs in Experiments
        - Trial automatically created per execution
    """

Usage:

from sagemaker.mlops.workflow import PipelineExperimentConfig
from sagemaker.core.workflow import ExecutionVariables

# Configure experiment tracking
experiment_config = PipelineExperimentConfig(
    experiment_name="my-experiment",
    trial_name=ExecutionVariables.PIPELINE_EXECUTION_ID  # Unique per execution
)

pipeline = Pipeline(
    name="experiment-pipeline",
    steps=[train_step],
    pipeline_experiment_config=experiment_config
)

# Each execution creates new trial in experiment
execution1 = pipeline.start()  # Creates trial-1
execution2 = pipeline.start()  # Creates trial-2

ParallelismConfiguration

class ParallelismConfiguration:
    """
    Parallelism configuration for pipeline execution.

    Fields:
        max_parallel_execution_steps: int - Maximum parallel steps (required)
            - Range: 1-50
            - Limits number of steps running simultaneously

    Notes:
        - Controls resource usage
        - Steps without dependencies run in parallel up to limit
        - Does not affect steps with dependencies (run in sequence)
        - Higher parallelism = faster execution but more concurrent resources
    """

Usage:

from sagemaker.mlops.workflow import ParallelismConfiguration

# Limit concurrent step execution
parallelism_config = ParallelismConfiguration(
    max_parallel_execution_steps=3
)

# Create pipeline with 6 independent processing steps
pipeline = Pipeline(
    name="parallel-pipeline",
    steps=[proc1, proc2, proc3, proc4, proc5, proc6],  # All independent
    parallelism_config=parallelism_config
)

# Executes 3 steps at a time:
# Round 1: proc1, proc2, proc3
# Round 2: proc4, proc5, proc6

CacheConfig

class CacheConfig:
    """
    Step caching configuration.

    Parameters:
        enable_caching: bool - Enable caching (required)
        expire_after: Optional[str] - Cache expiration (ISO 8601 duration)
            - Format: "PT1H" (1 hour), "P7D" (7 days), "P30D" (30 days)
            - Default: cache never expires

    Notes:
        - Caches step output based on input data and configuration
        - Skips execution if inputs unchanged and cache valid
        - Saves time and cost for expensive steps
        - Cache invalidated if:
            - Inputs changed
            - Step configuration changed
            - Cache expired
    """

Usage:

from sagemaker.mlops.workflow import ProcessingStep, CacheConfig

# Enable caching for expensive preprocessing
cache_config = CacheConfig(
    enable_caching=True,
    expire_after="P7D"  # Cache for 7 days
)

process_step = ProcessingStep(
    name="ExpensivePreprocessing",
    processor=processor,
    inputs=[...],
    outputs=[...],
    cache_config=cache_config
)

# First execution: runs processing
execution1 = pipeline.start()

# Second execution with same inputs: uses cache (skips processing)
execution2 = pipeline.start()

# Execution after 7 days: cache expired, runs processing
# Or if inputs changed: cache invalid, runs processing

SelectiveExecutionConfig

class SelectiveExecutionConfig:
    """
    Selective execution configuration.

    Fields:
        source_pipeline_execution_arn: str - Source pipeline execution ARN (required)
            - Previous execution to start from
        selected_steps: List[SelectedStep] - Steps to execute (required)
            - Specific steps to re-run

    Notes:
        - Re-run only specific steps from failed execution
        - Useful for debugging and iterative development
        - Steps must exist in source execution
        - Dependent steps automatically included
    """

Usage:

# Pipeline execution failed at evaluation step
failed_execution_arn = "arn:aws:sagemaker:us-west-2:123:pipeline/my-pipeline/execution/abc"

# Re-run only evaluation and subsequent steps
from sagemaker.core.shapes import SelectedStep

selective_config = SelectiveExecutionConfig(
    source_pipeline_execution_arn=failed_execution_arn,
    selected_steps=[
        SelectedStep(step_name="EvaluateModel"),
        SelectedStep(step_name="RegisterModel")
    ]
)

# Start selective execution
execution = pipeline.start(
    execution_description="Re-run evaluation only",
    selective_execution_config=selective_config
)

CheckJobConfig

class CheckJobConfig:
    """
    Configuration for check jobs (quality, bias, explainability).

    Fields:
        role: str - IAM role ARN (required)
        instance_count: int - Number of instances (default: 1)
        instance_type: str - Instance type (required)
        volume_size_in_gb: int - EBS volume size (default: 30)
        volume_kms_key: Optional[str] - KMS key for volume encryption
        output_s3_uri: str - S3 output URI (required)
        max_runtime_in_seconds: Optional[int] - Maximum runtime (default: 3600)
        base_job_name: Optional[str] - Base job name
        sagemaker_session: Optional[Session] - SageMaker session
        env: Optional[Dict[str, str]] - Environment variables
        tags: Optional[List[Tag]] - Resource tags
        network_config: Optional[NetworkConfig] - Network configuration

    Notes:
        - Used by QualityCheckStep and ClarifyCheckStep
        - Provisions resources for running checks
        - Results saved to output_s3_uri
    """

Retry Policies

StepRetryPolicy

class StepRetryPolicy:
    """
    Retry policy for pipeline steps.

    Parameters:
        exception_types: List[StepExceptionTypeEnum] - Exception types to retry (required)
        interval_seconds: int - Interval between retries (required)
            - Range: 1-300 seconds
        backoff_rate: float - Backoff rate multiplier (required)
            - Range: 1.0-2.0
            - Next interval = current interval * backoff_rate
        max_attempts: int - Maximum retry attempts (required)
            - Range: 1-10
        expire_after_mins: Optional[int] - Expiration time in minutes
            - Stop retrying after this time regardless of attempts

    Notes:
        - Retries for infrastructure failures (throttling, service faults)
        - Does not retry application errors
        - Backoff prevents overwhelming services
    """

SageMakerJobStepRetryPolicy

class SageMakerJobStepRetryPolicy:
    """
    Retry policy for SageMaker job steps (Training, Processing, Transform).

    Parameters:
        exception_types: List[SageMakerJobExceptionTypeEnum] - Exception types (required)
            - CAPACITY_ERROR: Insufficient instance capacity
            - INTERNAL_ERROR: Internal service errors
            - RESOURCE_LIMIT: Resource limits exceeded
        interval_seconds: int - Interval between retries (required)
        backoff_rate: float - Backoff rate multiplier (required)
        max_attempts: int - Maximum retry attempts (required)
        expire_after_mins: Optional[int] - Expiration time

    Notes:
        - Specific to SageMaker job failures
        - Retries capacity errors (common with spot/GPU instances)
        - Use for production pipelines requiring resilience
    """

Usage:

from sagemaker.mlops.workflow import (
    TrainingStep,
    SageMakerJobStepRetryPolicy,
    SageMakerJobExceptionTypeEnum
)

# Configure retry for capacity errors
retry_policy = SageMakerJobStepRetryPolicy(
    exception_types=[
        SageMakerJobExceptionTypeEnum.CAPACITY_ERROR,
        SageMakerJobExceptionTypeEnum.INTERNAL_ERROR
    ],
    interval_seconds=60,  # Start with 1 minute
    backoff_rate=2.0,  # Double each retry (1min, 2min, 4min, ...)
    max_attempts=5,
    expire_after_mins=60  # Stop after 1 hour total
)

# Apply to training step
train_step = TrainingStep(
    name="TrainWithRetry",
    estimator=trainer,
    retry_policies=[retry_policy]
)

# Pipeline automatically retries on capacity errors
pipeline = Pipeline(name="resilient-pipeline", steps=[train_step])

StepExceptionTypeEnum

class StepExceptionTypeEnum(Enum):
    """
    Step exception types for retry policies.

    Values:
        STEP_SERVICE_FAULT = "Step.SERVICE_FAULT"
            - Internal service faults
            - Transient errors from SageMaker service
        
        STEP_THROTTLING = "Step.THROTTLING"
            - API throttling errors
            - Too many concurrent API calls

    Notes:
        - These are general step-level errors
        - Different from SageMaker job-specific errors
        - Typically transient and benefit from retry
    """

SageMakerJobExceptionTypeEnum

class SageMakerJobExceptionTypeEnum(Enum):
    """
    SageMaker job exception types.

    Values:
        CAPACITY_ERROR = "SageMaker.CAPACITY_ERROR"
            - Insufficient capacity for requested instances
            - Common with GPU instances or spot
            - Retry often succeeds
        
        INTERNAL_ERROR = "SageMaker.INTERNAL_ERROR"
            - Internal service errors
            - Transient errors
            - Retry recommended
        
        RESOURCE_LIMIT = "SageMaker.RESOURCE_LIMIT"
            - Resource limits exceeded (service quotas)
            - Retry may not help unless quota increased
            - Check service quotas

    Notes:
        - Specific to SageMaker job steps
        - CAPACITY_ERROR most common, should always retry
        - RESOURCE_LIMIT may require quota increase
    """

Triggers and Scheduling

Trigger

class Trigger:
    """
    Pipeline trigger configuration.

    Parameters:
        source_pipeline_execution_status: str - Source execution status (required)
            - "Succeeded": Trigger on successful completion
            - "Failed": Trigger on failure
            - "Stopped": Trigger when stopped

    Notes:
        - Triggers another pipeline based on execution status
        - Use for chaining pipelines
        - Example: trigger evaluation pipeline after training succeeds
    """

PipelineSchedule

class PipelineSchedule:
    """
    Schedule-based pipeline trigger.

    Parameters:
        cron_expression: str - Cron expression (required)
            - Format: "cron(0 12 * * ? *)" (daily at noon)
            - UTC timezone
        state: str - Schedule state (required)
            - "ENABLED": Schedule active
            - "DISABLED": Schedule inactive

    Notes:
        - Automatically starts pipeline executions on schedule
        - Use for periodic training, data processing, etc.
        - Cron format: minute hour day month day-of-week year
        - All times in UTC
    """

Usage:

# Schedule pipeline for daily execution
from sagemaker.mlops.workflow import Pipeline
import boto3

pipeline.create(role_arn=role)

# Create EventBridge rule for scheduling
events = boto3.client('events')

events.put_rule(
    Name='daily-training-pipeline',
    ScheduleExpression='cron(0 2 * * ? *)',  # 2 AM UTC daily
    State='ENABLED',
    Description='Daily model retraining'
)

# Add target to start pipeline
events.put_targets(
    Rule='daily-training-pipeline',
    Targets=[{
        'Id': '1',
        'Arn': f'arn:aws:sagemaker:{region}:{account}:pipeline/{pipeline.name}',
        'RoleArn': events_role_arn,
        'SageMakerPipelineParameters': {
            'PipelineParameterList': [
                {'Name': 'InputData', 'Value': 's3://bucket/daily-data'}
            ]
        }
    }]
)

Advanced Usage

Complex Multi-Step Pipeline

from sagemaker.mlops.workflow import (
    Pipeline, ProcessingStep, TrainingStep, TuningStep,
    ConditionStep, ModelStep, TransformStep
)
from sagemaker.core.workflow import (
    ParameterString, ConditionGreaterThan, Join, ExecutionVariables
)

# Parameters
data_path = ParameterString(name="InputData", default_value="s3://bucket/data")
accuracy_threshold = ParameterFloat(name="AccuracyThreshold", default_value=0.90)

# Dynamic output path
output_path = Join(
    on="/",
    values=["s3://bucket/pipelines", ExecutionVariables.PIPELINE_EXECUTION_ID]
)

# Step 1: Data validation
validate_step = ProcessingStep(
    name="ValidateData",
    processor=validation_processor,
    inputs=[ProcessingInput(source=data_path)],
    outputs=[ProcessingOutput(
        output_name="validated",
        destination=Join(on="/", values=[output_path, "validated"])
    )]
)

# Step 2: Feature engineering
feature_step = ProcessingStep(
    name="FeatureEngineering",
    processor=feature_processor,
    inputs=[ProcessingInput(
        source=validate_step.properties.ProcessingOutputConfig.Outputs["validated"].S3Output.S3Uri
    )],
    outputs=[
        ProcessingOutput(output_name="train", destination=Join(on="/", values=[output_path, "train"])),
        ProcessingOutput(output_name="val", destination=Join(on="/", values=[output_path, "val"])),
        ProcessingOutput(output_name="test", destination=Join(on="/", values=[output_path, "test"]))
    ],
    cache_config=CacheConfig(enable_caching=True, expire_after="P7D")
)

# Step 3: Hyperparameter tuning
tuning_step = TuningStep(
    name="TuneModel",
    tuner=tuner,
    inputs={
        "training": feature_step.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
        "validation": feature_step.properties.ProcessingOutputConfig.Outputs["val"].S3Output.S3Uri
    }
)

# Step 4: Evaluate best model
eval_step = ProcessingStep(
    name="EvaluateModel",
    processor=eval_processor,
    inputs=[
        ProcessingInput(
            source=tuning_step.properties.BestTrainingJob.ModelArtifacts.S3ModelArtifacts
        ),
        ProcessingInput(
            source=feature_step.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri
        )
    ],
    outputs=[ProcessingOutput(output_name="metrics", destination=Join(on="/", values=[output_path, "metrics"]))],
    property_files=[
        PropertyFile(name="EvalMetrics", output_name="metrics", path="evaluation.json")
    ]
)

# Step 5: Register if metrics good
register_step = ModelStep(
    name="RegisterModel",
    step_args=builder.register_args(
        model_package_group_name="models",
        approval_status="PendingManualApproval"
    )
)

# Step 6: Deploy to staging
deploy_step = LambdaStep(
    name="DeployToStaging",
    lambda_func=deploy_lambda_arn,
    inputs={
        "model_package_arn": register_step.properties.ModelPackageArn,
        "endpoint_name": "staging-endpoint"
    }
)

# Conditional logic
accuracy = JsonGet(
    step_name=eval_step.name,
    property_file=eval_step.property_files[0],
    json_path="metrics.accuracy"
)

condition = ConditionGreaterThan(left=accuracy, right=accuracy_threshold)

condition_step = ConditionStep(
    name="CheckAccuracy",
    conditions=[condition],
    if_steps=[register_step, deploy_step],
    else_steps=[FailStep(name="LowAccuracy", error_message="Model accuracy below threshold")]
)

# Create complete pipeline
pipeline = Pipeline(
    name="production-ml-pipeline",
    parameters=[data_path, accuracy_threshold],
    steps=[
        validate_step,
        feature_step,
        tuning_step,
        eval_step,
        condition_step
    ],
    parallelism_config=ParallelismConfiguration(max_parallel_execution_steps=5),
    tags=[{"Key": "Environment", "Value": "Production"}]
)

# Create and execute
pipeline.upsert(
    role_arn=pipeline_role,
    description="End-to-end ML pipeline with validation and conditional deployment"
)

execution = pipeline.start(
    parameters={
        "InputData": "s3://bucket/latest-data",
        "AccuracyThreshold": 0.92
    },
    execution_display_name=f"Execution-{datetime.now().isoformat()}"
)

# Monitor execution
execution.wait(poll=30)

# Get step details
status = execution.describe()
for step in status['PipelineExecutionSteps']:
    print(f"{step['StepName']}: {step['StepStatus']}")
    if step.get('FailureReason'):
        print(f"  Failure: {step['FailureReason']}")

Pipeline with Parallel Processing

# Create multiple independent preprocessing steps
preprocess_steps = []
for i, data_source in enumerate(data_sources):
    step = ProcessingStep(
        name=f"Preprocess-{i}",
        processor=processor,
        inputs=[ProcessingInput(source=data_source)],
        outputs=[ProcessingOutput(
            output_name="output",
            destination=f"s3://bucket/processed-{i}"
        )]
    )
    preprocess_steps.append(step)

# Training depends on all preprocessing completing
train_step = TrainingStep(
    name="Train",
    estimator=trainer,
    depends_on=preprocess_steps  # Waits for all
)

# All preprocessing steps run in parallel (up to parallelism limit)
pipeline = Pipeline(
    name="parallel-preprocessing",
    steps=preprocess_steps + [train_step],
    parallelism_config=ParallelismConfiguration(
        max_parallel_execution_steps=len(data_sources)
    )
)

Validation and Constraints

Pipeline Constraints

  • Pipeline name: 1-256 characters, alphanumeric and hyphens
  • Maximum steps: 50 per pipeline
  • Maximum parameters: 200 per pipeline
  • Maximum execution time: 45 days
  • Maximum concurrent executions: 100 (default, can request increase)
  • Maximum pipeline size: 1 MB JSON definition

Step Constraints

  • Step names: Must be unique within pipeline
  • Maximum dependencies per step: 50
  • Maximum property file size: 100 MB
  • Maximum retry attempts: 10
  • Callback step timeout: 7 days

Execution Constraints

  • Maximum concurrent executions per pipeline: 100
  • Execution history retention: 120 days
  • Maximum parameters per execution: Same as pipeline definition
  • Parameter value length: Maximum 1024 characters

Common Error Scenarios

  1. Circular Dependency Detected:

    • Cause: Step A depends on B, B depends on A
    • Solution: Use PipelineGraph.is_cyclic() to validate before create
  2. Step Name Conflict:

    • Cause: Duplicate step names in pipeline
    • Solution: Ensure all step names unique
  3. Invalid Parameter Reference:

    • Cause: Referencing undefined parameter
    • Solution: Define all parameters in pipeline.parameters list
  4. Property Not Available:

    • Cause: Accessing property from non-dependent step
    • Solution: Add proper depends_on or use properties to create dependency
  5. Cache Miss on Sensitive Data:

    • Cause: Cached step using outdated data
    • Solution: Set appropriate expire_after or disable caching for dynamic data
  6. Execution Timeout:

    • Cause: Pipeline exceeds 45-day limit
    • Solution: Break into smaller pipelines, optimize long-running steps