Pipeline orchestration and workflow management for building complex ML workflows with conditional execution, parallelism, and retry policies.
Main workflow orchestration class for defining and executing ML pipelines.
class Pipeline:
"""
SageMaker Pipeline for workflow orchestration.
Parameters:
name: str - Pipeline name (required)
- 1-256 characters
- Alphanumeric and hyphens only
steps: List[Step] - Pipeline steps (required)
- List of step objects (TrainingStep, ProcessingStep, etc.)
- Dependencies determined by step properties references
- Maximum 50 steps per pipeline
parameters: Optional[List[Parameter]] - Pipeline parameters
- Runtime parameters (ParameterString, ParameterInteger, etc.)
- Maximum 200 parameters
sagemaker_session: Optional[Session] - SageMaker session
- Defaults to new session if not provided
pipeline_experiment_config: Optional[PipelineExperimentConfig] - Experiment configuration
- Links executions to experiments for tracking
parallelism_config: Optional[ParallelismConfiguration] - Parallelism configuration
- Controls maximum concurrent step executions
tags: Optional[List[Tag]] - Resource tags
- Applied to pipeline and all executions
Methods:
create(role_arn, description=None, tags=None) -> Dict
Create pipeline in SageMaker.
Parameters:
role_arn: str - IAM role ARN (required)
description: Optional[str] - Pipeline description (max 3072 chars)
tags: Optional[List[Tag]] - Additional tags
Returns:
Dict: Response with PipelineArn
Raises:
ValueError: Invalid pipeline definition or duplicate steps
ClientError: AWS API errors
update(role_arn, description=None) -> Dict
Update existing pipeline definition.
Parameters:
role_arn: str - IAM role ARN
description: Optional[str] - Updated description
Returns:
Dict: Response with PipelineArn
Raises:
ClientError: If pipeline doesn't exist or validation fails
upsert(role_arn, description=None, tags=None) -> Dict
Create or update pipeline.
Parameters:
role_arn: str - IAM role ARN (required)
description: Optional[str] - Pipeline description
tags: Optional[List[Tag]] - Tags (only for create)
Returns:
Dict: Response with PipelineArn
describe() -> Dict
Get pipeline description.
Returns:
Dict: Complete pipeline definition and metadata
delete() -> None
Delete pipeline (stops all running executions).
Raises:
ClientError: If pipeline has dependencies or deletion fails
start(parameters=None, execution_display_name=None) -> PipelineExecution
Start pipeline execution.
Parameters:
parameters: Optional[Dict[str, Any]] - Parameter values
execution_display_name: Optional[str] - Display name for execution
Returns:
PipelineExecution: Started execution resource
Raises:
ValueError: Invalid parameter values
ClientError: Execution start errors
list_executions(sort_by="CreationTime", sort_order="Descending", max_results=10) -> List[Dict]
List pipeline executions.
Parameters:
sort_by: str - Sort field (CreationTime or PipelineExecutionArn)
sort_order: str - Ascending or Descending
max_results: int - Maximum results (1-100)
Returns:
List[Dict]: Execution summaries
Class Methods:
attach(pipeline_name, sagemaker_session=None) -> Pipeline
Attach to existing pipeline.
Parameters:
pipeline_name: str - Existing pipeline name
sagemaker_session: Optional[Session] - SageMaker session
Returns:
Pipeline: Attached pipeline object
Raises:
ClientError: If pipeline doesn't exist
Raises:
ValueError: Invalid pipeline structure, circular dependencies, duplicate step names
ClientError: AWS API errors
Notes:
- Steps execute in dependency order (DAG)
- Circular dependencies not allowed
- Step names must be unique
- Maximum execution time: 45 days
- Maximum concurrent executions: 100 (can request increase)
"""Usage:
from sagemaker.mlops.workflow import Pipeline, TrainingStep, ProcessingStep
from sagemaker.train import ModelTrainer
from sagemaker.core import Processor
# Define processing step
process_step = ProcessingStep(
name="PreprocessData",
processor=processor,
inputs=[
ProcessingInput(
source="s3://bucket/raw",
destination="/opt/ml/processing/input"
)
],
outputs=[
ProcessingOutput(
output_name="train",
source="/opt/ml/processing/train",
destination="s3://bucket/processed/train"
),
ProcessingOutput(
output_name="val",
source="/opt/ml/processing/val",
destination="s3://bucket/processed/val"
)
]
)
# Define training step with dependency on processing
train_step = TrainingStep(
name="TrainModel",
estimator=trainer,
inputs={
"training": process_step.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
"validation": process_step.properties.ProcessingOutputConfig.Outputs["val"].S3Output.S3Uri
}
)
# Create pipeline
pipeline = Pipeline(
name="ml-pipeline",
steps=[process_step, train_step], # Order doesn't matter (dependency-based)
sagemaker_session=session,
tags=[
{"Key": "Project", "Value": "CustomerChurn"},
{"Key": "Environment", "Value": "Production"}
]
)
# Create in SageMaker
try:
response = pipeline.create(
role_arn="arn:aws:iam::123456789012:role/SageMakerPipelineRole",
description="Customer churn prediction pipeline"
)
print(f"Pipeline created: {response['PipelineArn']}")
# Execute pipeline
execution = pipeline.start()
print(f"Execution started: {execution.arn}")
# Wait for completion
execution.wait(poll=30)
print(f"Status: {execution.pipeline_execution_status}")
except ValueError as e:
print(f"Pipeline validation error: {e}")
except ClientError as e:
print(f"AWS API error: {e}")Parameterized Pipeline:
from sagemaker.core.workflow import ParameterString, ParameterInteger, ParameterFloat
# Define parameters
instance_type = ParameterString(
name="TrainingInstanceType",
default_value="ml.m5.xlarge",
enum_values=["ml.m5.xlarge", "ml.m5.2xlarge", "ml.p3.2xlarge"]
)
instance_count = ParameterInteger(
name="InstanceCount",
default_value=1,
min_value=1,
max_value=10
)
learning_rate = ParameterFloat(
name="LearningRate",
default_value=0.001,
min_value=0.0001,
max_value=0.1
)
data_path = ParameterString(
name="InputDataPath",
default_value="s3://bucket/default-data"
)
# Use parameters in steps
trainer = ModelTrainer(
compute=Compute(
instance_type=instance_type, # Parameter reference
instance_count=instance_count
),
hyperparameters={
"learning_rate": learning_rate # Parameter reference
}
)
train_step = TrainingStep(
name="Train",
estimator=trainer,
inputs={"training": data_path}
)
# Create parameterized pipeline
pipeline = Pipeline(
name="parameterized-pipeline",
parameters=[instance_type, instance_count, learning_rate, data_path],
steps=[train_step]
)
pipeline.create(role_arn=role)
# Execute with custom parameters
execution = pipeline.start(
parameters={
"TrainingInstanceType": "ml.p3.2xlarge",
"InstanceCount": 2,
"LearningRate": 0.01,
"InputDataPath": "s3://bucket/experiment-data"
}
)class PipelineGraph:
"""
Pipeline Directed Acyclic Graph (DAG) representation.
Helper class for analyzing pipeline structure and dependencies.
Attributes:
steps: Sequence[Step] - Steps representing nodes in the DAG
step_map: Dict[str, Step] - Mapping of step names to step instances
adjacency_list: Dict[str, List[str]] - DAG adjacency list representation
Methods:
is_cyclic() -> bool
Check if graph contains cycles.
Returns:
bool: True if cycles detected, False otherwise
get_dependencies(step_name) -> List[str]
Get dependencies for a step.
Parameters:
step_name: str - Name of step
Returns:
List[str]: List of step names this step depends on
topological_sort() -> List[Step]
Get topologically sorted steps.
Returns:
List[Step]: Steps in execution order
Raises:
ValueError: If graph contains cycles
Class Methods:
from_pipeline(pipeline) -> PipelineGraph
Create graph from Pipeline.
Parameters:
pipeline: Pipeline - Pipeline to analyze
Returns:
PipelineGraph: Graph representation
Notes:
- Use to validate pipeline structure before deployment
- Detect circular dependencies
- Understand execution order
- Identify parallel execution opportunities
"""Usage:
from sagemaker.mlops.workflow import Pipeline, PipelineGraph
# Create pipeline
pipeline = Pipeline(name="my-pipeline", steps=[step1, step2, step3, step4])
# Analyze pipeline graph
graph = PipelineGraph.from_pipeline(pipeline)
# Check for cycles (invalid)
if graph.is_cyclic():
raise ValueError("Pipeline contains cycles!")
# Get execution order
sorted_steps = graph.topological_sort()
print("Execution order:")
for i, step in enumerate(sorted_steps, 1):
print(f" {i}. {step.name}")
# Analyze dependencies
for step in graph.steps:
deps = graph.get_dependencies(step.name)
if deps:
print(f"{step.name} depends on: {deps}")
else:
print(f"{step.name} has no dependencies (can run first)")class StepCollection:
"""
Wrapper for grouping multiple pipeline steps.
Allows treating multiple steps as a single unit with shared dependencies.
Attributes:
name: str - Name of the step collection (required)
steps: List[Step] - List of steps in the collection (required)
depends_on: List[Union[str, Step, StepCollection, StepOutput]] - Dependencies
- Step names (strings)
- Step objects
- Other StepCollections
- StepOutput objects
Methods:
request_dicts() -> List[RequestType]
Get request structures for all steps.
Returns:
List[RequestType]: API request dicts for all steps in collection
Properties:
properties: Properties
Access properties from steps in collection.
Notes:
- All steps in collection share same dependencies
- Steps within collection can depend on each other
- Useful for organizing related preprocessing or evaluation steps
"""Usage:
from sagemaker.mlops.workflow import StepCollection, ProcessingStep
# Group related preprocessing steps
preprocessing_steps = StepCollection(
name="DataPreprocessing",
steps=[
ProcessingStep(
name="CleanData",
processor=clean_processor,
inputs=[ProcessingInput(source="s3://bucket/raw")],
outputs=[ProcessingOutput(
output_name="cleaned",
destination="s3://bucket/cleaned"
)]
),
ProcessingStep(
name="FeatureEngineering",
processor=feature_processor,
inputs=[ProcessingInput(source="s3://bucket/cleaned")],
outputs=[ProcessingOutput(
output_name="features",
destination="s3://bucket/features"
)]
),
ProcessingStep(
name="SplitData",
processor=split_processor,
inputs=[ProcessingInput(source="s3://bucket/features")],
outputs=[
ProcessingOutput(output_name="train", destination="s3://bucket/train"),
ProcessingOutput(output_name="val", destination="s3://bucket/val")
]
)
]
)
# Training depends on entire preprocessing collection
training_step = TrainingStep(
name="Train",
estimator=trainer,
depends_on=[preprocessing_steps] # Runs after all preprocessing completes
)
pipeline = Pipeline(
name="pipeline-with-collection",
steps=[preprocessing_steps, training_step]
)class TrainingStep:
"""
Training step for pipelines.
Parameters:
name: str - Step name (required, unique within pipeline)
estimator: ModelTrainer - Model trainer instance (required)
inputs: Optional[Union[TrainingInput, Dict]] - Training inputs
- Dict: {"channel_name": s3_uri_or_property}
- TrainingInput: Full input specification
cache_config: Optional[CacheConfig] - Caching configuration
retry_policies: Optional[List[RetryPolicy]] - Retry policies
depends_on: Optional[List[Union[str, Step]]] - Dependencies
- List of step names or step objects
Properties:
properties: Properties - Access to step outputs
- properties.TrainingJobName: Training job name
- properties.ModelArtifacts.S3ModelArtifacts: Model S3 URI
- properties.FinalMetricDataList[metric_name].Value: Metric values
Notes:
- Automatically depends on steps referenced in inputs
- Can reference properties from previous steps
- Supports caching to skip if inputs unchanged
"""class ProcessingStep:
"""
Processing step for data processing.
Parameters:
name: str - Step name (required, unique)
processor: Processor - Processor instance (required)
inputs: Optional[List[ProcessingInput]] - Processing inputs
outputs: Optional[List[ProcessingOutput]] - Processing outputs
job_arguments: Optional[List[str]] - Command-line arguments
code: Optional[str] - Processing script path
cache_config: Optional[CacheConfig] - Caching configuration
retry_policies: Optional[List[RetryPolicy]] - Retry policies
depends_on: Optional[List[Union[str, Step]]] - Dependencies
property_files: Optional[List[PropertyFile]] - Property files for JsonGet
Properties:
properties: Properties - Access to step outputs
- properties.ProcessingJobName: Job name
- properties.ProcessingOutputConfig.Outputs[output_name].S3Output.S3Uri: Output URIs
- properties.ProcessingJobStatus: Job status
Notes:
- Use for data preprocessing, feature engineering, evaluation
- Can output property files for conditional logic
- Supports multiple inputs and outputs
- Arguments passed to container as command-line args
"""class TransformStep:
"""
Batch transform step.
Parameters:
name: str - Step name (required, unique)
transformer: Transformer - Transformer instance (required)
inputs: TransformInput - Transform input configuration (required)
cache_config: Optional[CacheConfig] - Caching configuration
retry_policies: Optional[List[RetryPolicy]] - Retry policies
depends_on: Optional[List[Union[str, Step]]] - Dependencies
Properties:
properties: Properties - Access to step outputs
- properties.TransformJobName: Job name
- properties.TransformOutput.S3OutputPath: Output URI
Notes:
- Use for batch inference on large datasets
- More cost-effective than real-time endpoints for batch
- Can split large files across multiple instances
"""class TuningStep:
"""
Hyperparameter tuning step.
Parameters:
name: str - Step name (required, unique)
tuner: HyperparameterTuner - Tuner instance (required)
inputs: Optional[Union[TrainingInput, Dict]] - Training inputs
cache_config: Optional[CacheConfig] - Caching configuration
retry_policies: Optional[List[RetryPolicy]] - Retry policies
depends_on: Optional[List[Union[str, Step]]] - Dependencies
Properties:
properties: Properties - Access to tuning outputs
- properties.HyperParameterTuningJobName: Tuning job name
- properties.BestTrainingJob.TrainingJobName: Best job name
- properties.BestTrainingJob.ModelArtifacts.S3ModelArtifacts: Best model URI
Notes:
- Can reference best model from tuning in subsequent steps
- Supports caching based on input data and hyperparameter ranges
"""class ConditionStep:
"""
Conditional execution step.
Parameters:
name: str - Step name (required, unique)
conditions: List[Condition] - Conditions to evaluate (required)
- All conditions must be True for if_steps
- Supports: ConditionEquals, ConditionGreaterThan, etc.
if_steps: List[Step] - Steps to execute if conditions are True (required)
else_steps: Optional[List[Step]] - Steps to execute if conditions are False
depends_on: Optional[List[Union[str, Step]]] - Dependencies
Notes:
- All conditions combined with AND logic
- Use ConditionOr for OR logic
- Can nest conditional steps
- Steps in if_steps/else_steps can have their own dependencies
- Only executed branch contributes to execution time/cost
"""Conditional Pipeline Example:
from sagemaker.mlops.workflow import Pipeline, TrainingStep, ConditionStep, ModelStep
from sagemaker.core.workflow import ConditionGreaterThan, ParameterFloat
# Define accuracy threshold parameter
accuracy_threshold = ParameterFloat(name="AccuracyThreshold", default_value=0.9)
# Training step
train_step = TrainingStep(name="Train", estimator=trainer)
# Model registration step
register_step = ModelStep(
name="RegisterModel",
step_args=builder.register_args(
model_package_group_name="models",
approval_status="PendingManualApproval"
)
)
# Failure step
fail_step = FailStep(
name="TrainingFailed",
error_message="Model accuracy below threshold"
)
# Condition based on training accuracy
accuracy_condition = ConditionGreaterThan(
left=train_step.properties.FinalMetricDataList["validation:accuracy"].Value,
right=accuracy_threshold
)
# Conditional registration
condition_step = ConditionStep(
name="CheckAccuracy",
conditions=[accuracy_condition],
if_steps=[register_step], # Register if accuracy good
else_steps=[fail_step] # Fail if accuracy low
)
# Create conditional pipeline
pipeline = Pipeline(
name="conditional-pipeline",
parameters=[accuracy_threshold],
steps=[train_step, condition_step]
)
# Execute with custom threshold
execution = pipeline.start(
parameters={"AccuracyThreshold": 0.95}
)class ModelStep:
"""
Model creation step.
Parameters:
name: str - Step name (required, unique)
step_args: Dict - Step arguments from ModelBuilder (required)
- Obtained from builder.build_args() or builder.register_args()
depends_on: Optional[List[Union[str, Step]]] - Dependencies
Properties:
properties: Properties - Access to model outputs
- properties.ModelName: Created model name
- properties.PrimaryContainer.Image: Container image URI
Notes:
- Use to create SageMaker Model resource in pipeline
- Often follows training step
- Model can be deployed in subsequent steps
"""class LambdaStep:
"""
AWS Lambda function step.
Parameters:
name: str - Step name (required, unique)
lambda_func: Union[Lambda, str] - Lambda function or ARN (required)
- Lambda object
- Function ARN: "arn:aws:lambda:region:account:function:name"
inputs: Dict - Lambda function inputs (required)
- JSON-serializable dictionary
- Can include pipeline variables
outputs: Optional[List[LambdaOutput]] - Lambda outputs
- Define output variables from Lambda response
cache_config: Optional[CacheConfig] - Caching configuration
depends_on: Optional[List[Union[str, Step]]] - Dependencies
Properties:
properties: Properties - Access to Lambda outputs
- properties.OutputName: Defined output values
Notes:
- Lambda timeout: maximum 15 minutes
- Use for custom logic not available in SageMaker
- Lambda must return JSON-serializable output
- Lambda execution role needs permissions for SageMaker API calls
"""Usage:
from sagemaker.mlops.workflow import LambdaStep, LambdaOutput
# Define Lambda step for custom validation
validate_step = LambdaStep(
name="ValidateData",
lambda_func="arn:aws:lambda:us-west-2:123:function:validate-data",
inputs={
"data_uri": process_step.properties.ProcessingOutputConfig.Outputs["output"].S3Output.S3Uri,
"schema_uri": "s3://bucket/schema.json"
},
outputs=[
LambdaOutput(output_name="is_valid", output_type="Boolean"),
LambdaOutput(output_name="error_count", output_type="Integer")
]
)
# Use Lambda outputs in condition
is_valid = validate_step.properties.Outputs["is_valid"]
condition = ConditionEquals(left=is_valid, right=True)
condition_step = ConditionStep(
name="CheckValidation",
conditions=[condition],
if_steps=[train_step],
else_steps=[fail_step]
)class CallbackStep:
"""
Custom callback step for external integrations.
Parameters:
name: str - Step name (required, unique)
sqs_queue_url: str - SQS queue URL for callbacks (required)
- Format: "https://sqs.region.amazonaws.com/account/queue-name"
inputs: Dict - Callback inputs (required)
- Sent to SQS queue
- JSON-serializable
outputs: Optional[List[CallbackOutput]] - Callback outputs
- Define expected outputs from external system
depends_on: Optional[List[Union[str, Step]]] - Dependencies
Properties:
properties: Properties - Access to callback outputs
- properties.OutputName: Output values from callback response
Notes:
- Pipeline pauses until callback received
- External system must send response to SQS queue
- Response format: {"token": "...", "output": {...}}
- Maximum wait time: 7 days
- Use for manual approval, external validation, etc.
"""Usage:
from sagemaker.mlops.workflow import CallbackStep, CallbackOutput
# Create SQS queue for callbacks
import boto3
sqs = boto3.client('sqs')
queue_url = sqs.create_queue(QueueName='pipeline-callbacks')['QueueUrl']
# Define callback step for manual approval
approval_step = CallbackStep(
name="ManualApproval",
sqs_queue_url=queue_url,
inputs={
"model_metrics": train_step.properties.FinalMetricDataList,
"model_uri": train_step.properties.ModelArtifacts.S3ModelArtifacts
},
outputs=[
CallbackOutput(output_name="approved", output_type="Boolean"),
CallbackOutput(output_name="comments", output_type="String")
]
)
# External system sends approval
# Response format:
# {
# "token": "<callback_token_from_sqs_message>",
# "output": {
# "approved": true,
# "comments": "Model approved for production"
# }
# }
# Continue based on approval
condition = ConditionEquals(
left=approval_step.properties.Outputs["approved"],
right=True
)
condition_step = ConditionStep(
name="CheckApproval",
conditions=[condition],
if_steps=[deploy_step],
else_steps=[fail_step]
)class FailStep:
"""
Explicit failure step for error handling.
Parameters:
name: str - Step name (required, unique)
error_message: Union[str, PipelineVariable] - Error message (required)
- Static string or pipeline variable
depends_on: Optional[List[Union[str, Step]]] - Dependencies
Notes:
- Explicitly fails pipeline execution with custom message
- Use in else_steps of conditional logic
- Execution marked as Failed with provided error message
- Useful for validation failures or unmet requirements
"""class QualityCheckStep:
"""
Model/data quality check step.
Parameters:
name: str - Step name (required, unique)
check_job_config: CheckJobConfig - Check job configuration (required)
quality_check_config: QualityCheckConfig - Quality configuration (required)
model_package_group_name: Optional[str] - Model package group for baselines
skip_check: Optional[bool] - Skip check (default: False)
register_new_baseline: Optional[bool] - Register new baseline (default: False)
supplied_baseline_constraints: Optional[str] - S3 URI for existing baseline
depends_on: Optional[List[Union[str, Step]]] - Dependencies
Properties:
properties: Properties - Access to quality check outputs
- properties.CalculatedBaselineConstraints: S3 URI
- properties.BaselineUsedForDriftCheckConstraints: S3 URI
Notes:
- Validates data or model quality against baseline
- Can register first run as baseline
- Subsequent runs check drift from baseline
- Fails pipeline if quality below threshold
"""class ClarifyCheckStep:
"""
Bias and explainability check step.
Parameters:
name: str - Step name (required, unique)
clarify_check_config: Dict - Clarify configuration (required)
- bias_config or explainability_config
check_job_config: CheckJobConfig - Check job configuration (required)
skip_check: Optional[bool] - Skip check (default: False)
register_new_baseline: Optional[bool] - Register new baseline (default: False)
supplied_baseline_constraints: Optional[str] - S3 URI for baseline
model_package_group_name: Optional[str] - Model package group
depends_on: Optional[List[Union[str, Step]]] - Dependencies
Properties:
properties: Properties - Access to Clarify check outputs
- properties.CalculatedBaselineConstraints: Bias/explainability baseline
- properties.ViolationReport: S3 URI if violations detected
Notes:
- Checks model bias or explainability
- Fails pipeline if bias exceeds threshold
- Use after training or before deployment
"""class AutoMLStep:
"""
AutoML training step.
Parameters:
name: str - Step name (required, unique)
step_args: Dict - AutoML step arguments (required)
- From AutoML.get_step_args()
depends_on: Optional[List[Union[str, Step]]] - Dependencies
Properties:
properties: Properties - Access to AutoML outputs
- properties.BestCandidate: Best model candidate
- properties.BestCandidate.ModelInsightsPath: Model insights
Notes:
- Automatically finds best model and hyperparameters
- Longer execution time than manual training
- Use for baseline models or when expertise limited
"""class EMRStep:
"""
Amazon EMR step for big data processing.
Parameters:
name: str - Step name (required, unique)
cluster_id: Union[str, PipelineVariable] - EMR cluster ID (required)
- Existing cluster ID or pipeline variable
step_config: Union[EMRStepConfig, Dict] - EMR step configuration (required)
- Jar, HadoopJarStep, or SparkStep config
depends_on: Optional[List[Union[str, Step]]] - Dependencies
Properties:
properties: Properties - Access to EMR step outputs
- properties.ClusterId: EMR cluster ID
- properties.StepId: EMR step ID
Notes:
- Requires existing EMR cluster
- Use for large-scale data processing with Spark/Hadoop
- EMR cluster must be in same region as pipeline
"""class NotebookJobStep:
"""
Notebook execution step.
Parameters:
name: str - Step name (required, unique)
notebook_job_name: str - Notebook job name (required)
input_notebook: str - S3 URI of input notebook (required)
image_uri: str - Container image URI (required)
kernel_name: str - Jupyter kernel name (required)
role: str - IAM role ARN (required)
instance_type: str - Instance type (required)
depends_on: Optional[List[Union[str, Step]]] - Dependencies
Properties:
properties: Properties - Access to notebook outputs
- properties.OutputNotebookLocation: S3 URI of executed notebook
Notes:
- Executes Jupyter notebooks in pipeline
- Useful for exploratory analysis in automated workflows
- Output notebook contains execution results
"""class MonitorBatchTransformStep:
"""
Batch transform with monitoring.
Parameters:
name: str - Step name (required, unique)
transform_step_name: str - Transform step to monitor (required)
monitor_configuration: Dict - Monitoring configuration (required)
check_job_configuration: CheckJobConfig - Check job configuration (required)
depends_on: Optional[List[Union[str, Step]]] - Dependencies
Properties:
properties: Properties - Access to monitoring outputs
- properties.CalculatedBaselineConstraints: Baseline
- properties.ViolationReport: Violations if any
Notes:
- Monitors transform job outputs for quality/drift
- Fails if violations detected
- Use for production batch inference monitoring
"""class PipelineExperimentConfig:
"""
Experiment configuration for pipelines.
Fields:
experiment_name: Union[str, PipelineVariable] - Experiment name (required)
- Static string or ExecutionVariables.PIPELINE_NAME
trial_name: Union[str, PipelineVariable] - Trial name (required)
- Typically ExecutionVariables.PIPELINE_EXECUTION_ID for unique trials
Notes:
- Links each pipeline execution to an experiment trial
- Enables tracking pipeline runs in Experiments
- Trial automatically created per execution
"""Usage:
from sagemaker.mlops.workflow import PipelineExperimentConfig
from sagemaker.core.workflow import ExecutionVariables
# Configure experiment tracking
experiment_config = PipelineExperimentConfig(
experiment_name="my-experiment",
trial_name=ExecutionVariables.PIPELINE_EXECUTION_ID # Unique per execution
)
pipeline = Pipeline(
name="experiment-pipeline",
steps=[train_step],
pipeline_experiment_config=experiment_config
)
# Each execution creates new trial in experiment
execution1 = pipeline.start() # Creates trial-1
execution2 = pipeline.start() # Creates trial-2class ParallelismConfiguration:
"""
Parallelism configuration for pipeline execution.
Fields:
max_parallel_execution_steps: int - Maximum parallel steps (required)
- Range: 1-50
- Limits number of steps running simultaneously
Notes:
- Controls resource usage
- Steps without dependencies run in parallel up to limit
- Does not affect steps with dependencies (run in sequence)
- Higher parallelism = faster execution but more concurrent resources
"""Usage:
from sagemaker.mlops.workflow import ParallelismConfiguration
# Limit concurrent step execution
parallelism_config = ParallelismConfiguration(
max_parallel_execution_steps=3
)
# Create pipeline with 6 independent processing steps
pipeline = Pipeline(
name="parallel-pipeline",
steps=[proc1, proc2, proc3, proc4, proc5, proc6], # All independent
parallelism_config=parallelism_config
)
# Executes 3 steps at a time:
# Round 1: proc1, proc2, proc3
# Round 2: proc4, proc5, proc6class CacheConfig:
"""
Step caching configuration.
Parameters:
enable_caching: bool - Enable caching (required)
expire_after: Optional[str] - Cache expiration (ISO 8601 duration)
- Format: "PT1H" (1 hour), "P7D" (7 days), "P30D" (30 days)
- Default: cache never expires
Notes:
- Caches step output based on input data and configuration
- Skips execution if inputs unchanged and cache valid
- Saves time and cost for expensive steps
- Cache invalidated if:
- Inputs changed
- Step configuration changed
- Cache expired
"""Usage:
from sagemaker.mlops.workflow import ProcessingStep, CacheConfig
# Enable caching for expensive preprocessing
cache_config = CacheConfig(
enable_caching=True,
expire_after="P7D" # Cache for 7 days
)
process_step = ProcessingStep(
name="ExpensivePreprocessing",
processor=processor,
inputs=[...],
outputs=[...],
cache_config=cache_config
)
# First execution: runs processing
execution1 = pipeline.start()
# Second execution with same inputs: uses cache (skips processing)
execution2 = pipeline.start()
# Execution after 7 days: cache expired, runs processing
# Or if inputs changed: cache invalid, runs processingclass SelectiveExecutionConfig:
"""
Selective execution configuration.
Fields:
source_pipeline_execution_arn: str - Source pipeline execution ARN (required)
- Previous execution to start from
selected_steps: List[SelectedStep] - Steps to execute (required)
- Specific steps to re-run
Notes:
- Re-run only specific steps from failed execution
- Useful for debugging and iterative development
- Steps must exist in source execution
- Dependent steps automatically included
"""Usage:
# Pipeline execution failed at evaluation step
failed_execution_arn = "arn:aws:sagemaker:us-west-2:123:pipeline/my-pipeline/execution/abc"
# Re-run only evaluation and subsequent steps
from sagemaker.core.shapes import SelectedStep
selective_config = SelectiveExecutionConfig(
source_pipeline_execution_arn=failed_execution_arn,
selected_steps=[
SelectedStep(step_name="EvaluateModel"),
SelectedStep(step_name="RegisterModel")
]
)
# Start selective execution
execution = pipeline.start(
execution_description="Re-run evaluation only",
selective_execution_config=selective_config
)class CheckJobConfig:
"""
Configuration for check jobs (quality, bias, explainability).
Fields:
role: str - IAM role ARN (required)
instance_count: int - Number of instances (default: 1)
instance_type: str - Instance type (required)
volume_size_in_gb: int - EBS volume size (default: 30)
volume_kms_key: Optional[str] - KMS key for volume encryption
output_s3_uri: str - S3 output URI (required)
max_runtime_in_seconds: Optional[int] - Maximum runtime (default: 3600)
base_job_name: Optional[str] - Base job name
sagemaker_session: Optional[Session] - SageMaker session
env: Optional[Dict[str, str]] - Environment variables
tags: Optional[List[Tag]] - Resource tags
network_config: Optional[NetworkConfig] - Network configuration
Notes:
- Used by QualityCheckStep and ClarifyCheckStep
- Provisions resources for running checks
- Results saved to output_s3_uri
"""class StepRetryPolicy:
"""
Retry policy for pipeline steps.
Parameters:
exception_types: List[StepExceptionTypeEnum] - Exception types to retry (required)
interval_seconds: int - Interval between retries (required)
- Range: 1-300 seconds
backoff_rate: float - Backoff rate multiplier (required)
- Range: 1.0-2.0
- Next interval = current interval * backoff_rate
max_attempts: int - Maximum retry attempts (required)
- Range: 1-10
expire_after_mins: Optional[int] - Expiration time in minutes
- Stop retrying after this time regardless of attempts
Notes:
- Retries for infrastructure failures (throttling, service faults)
- Does not retry application errors
- Backoff prevents overwhelming services
"""class SageMakerJobStepRetryPolicy:
"""
Retry policy for SageMaker job steps (Training, Processing, Transform).
Parameters:
exception_types: List[SageMakerJobExceptionTypeEnum] - Exception types (required)
- CAPACITY_ERROR: Insufficient instance capacity
- INTERNAL_ERROR: Internal service errors
- RESOURCE_LIMIT: Resource limits exceeded
interval_seconds: int - Interval between retries (required)
backoff_rate: float - Backoff rate multiplier (required)
max_attempts: int - Maximum retry attempts (required)
expire_after_mins: Optional[int] - Expiration time
Notes:
- Specific to SageMaker job failures
- Retries capacity errors (common with spot/GPU instances)
- Use for production pipelines requiring resilience
"""Usage:
from sagemaker.mlops.workflow import (
TrainingStep,
SageMakerJobStepRetryPolicy,
SageMakerJobExceptionTypeEnum
)
# Configure retry for capacity errors
retry_policy = SageMakerJobStepRetryPolicy(
exception_types=[
SageMakerJobExceptionTypeEnum.CAPACITY_ERROR,
SageMakerJobExceptionTypeEnum.INTERNAL_ERROR
],
interval_seconds=60, # Start with 1 minute
backoff_rate=2.0, # Double each retry (1min, 2min, 4min, ...)
max_attempts=5,
expire_after_mins=60 # Stop after 1 hour total
)
# Apply to training step
train_step = TrainingStep(
name="TrainWithRetry",
estimator=trainer,
retry_policies=[retry_policy]
)
# Pipeline automatically retries on capacity errors
pipeline = Pipeline(name="resilient-pipeline", steps=[train_step])class StepExceptionTypeEnum(Enum):
"""
Step exception types for retry policies.
Values:
STEP_SERVICE_FAULT = "Step.SERVICE_FAULT"
- Internal service faults
- Transient errors from SageMaker service
STEP_THROTTLING = "Step.THROTTLING"
- API throttling errors
- Too many concurrent API calls
Notes:
- These are general step-level errors
- Different from SageMaker job-specific errors
- Typically transient and benefit from retry
"""class SageMakerJobExceptionTypeEnum(Enum):
"""
SageMaker job exception types.
Values:
CAPACITY_ERROR = "SageMaker.CAPACITY_ERROR"
- Insufficient capacity for requested instances
- Common with GPU instances or spot
- Retry often succeeds
INTERNAL_ERROR = "SageMaker.INTERNAL_ERROR"
- Internal service errors
- Transient errors
- Retry recommended
RESOURCE_LIMIT = "SageMaker.RESOURCE_LIMIT"
- Resource limits exceeded (service quotas)
- Retry may not help unless quota increased
- Check service quotas
Notes:
- Specific to SageMaker job steps
- CAPACITY_ERROR most common, should always retry
- RESOURCE_LIMIT may require quota increase
"""class Trigger:
"""
Pipeline trigger configuration.
Parameters:
source_pipeline_execution_status: str - Source execution status (required)
- "Succeeded": Trigger on successful completion
- "Failed": Trigger on failure
- "Stopped": Trigger when stopped
Notes:
- Triggers another pipeline based on execution status
- Use for chaining pipelines
- Example: trigger evaluation pipeline after training succeeds
"""class PipelineSchedule:
"""
Schedule-based pipeline trigger.
Parameters:
cron_expression: str - Cron expression (required)
- Format: "cron(0 12 * * ? *)" (daily at noon)
- UTC timezone
state: str - Schedule state (required)
- "ENABLED": Schedule active
- "DISABLED": Schedule inactive
Notes:
- Automatically starts pipeline executions on schedule
- Use for periodic training, data processing, etc.
- Cron format: minute hour day month day-of-week year
- All times in UTC
"""Usage:
# Schedule pipeline for daily execution
from sagemaker.mlops.workflow import Pipeline
import boto3
pipeline.create(role_arn=role)
# Create EventBridge rule for scheduling
events = boto3.client('events')
events.put_rule(
Name='daily-training-pipeline',
ScheduleExpression='cron(0 2 * * ? *)', # 2 AM UTC daily
State='ENABLED',
Description='Daily model retraining'
)
# Add target to start pipeline
events.put_targets(
Rule='daily-training-pipeline',
Targets=[{
'Id': '1',
'Arn': f'arn:aws:sagemaker:{region}:{account}:pipeline/{pipeline.name}',
'RoleArn': events_role_arn,
'SageMakerPipelineParameters': {
'PipelineParameterList': [
{'Name': 'InputData', 'Value': 's3://bucket/daily-data'}
]
}
}]
)from sagemaker.mlops.workflow import (
Pipeline, ProcessingStep, TrainingStep, TuningStep,
ConditionStep, ModelStep, TransformStep
)
from sagemaker.core.workflow import (
ParameterString, ConditionGreaterThan, Join, ExecutionVariables
)
# Parameters
data_path = ParameterString(name="InputData", default_value="s3://bucket/data")
accuracy_threshold = ParameterFloat(name="AccuracyThreshold", default_value=0.90)
# Dynamic output path
output_path = Join(
on="/",
values=["s3://bucket/pipelines", ExecutionVariables.PIPELINE_EXECUTION_ID]
)
# Step 1: Data validation
validate_step = ProcessingStep(
name="ValidateData",
processor=validation_processor,
inputs=[ProcessingInput(source=data_path)],
outputs=[ProcessingOutput(
output_name="validated",
destination=Join(on="/", values=[output_path, "validated"])
)]
)
# Step 2: Feature engineering
feature_step = ProcessingStep(
name="FeatureEngineering",
processor=feature_processor,
inputs=[ProcessingInput(
source=validate_step.properties.ProcessingOutputConfig.Outputs["validated"].S3Output.S3Uri
)],
outputs=[
ProcessingOutput(output_name="train", destination=Join(on="/", values=[output_path, "train"])),
ProcessingOutput(output_name="val", destination=Join(on="/", values=[output_path, "val"])),
ProcessingOutput(output_name="test", destination=Join(on="/", values=[output_path, "test"]))
],
cache_config=CacheConfig(enable_caching=True, expire_after="P7D")
)
# Step 3: Hyperparameter tuning
tuning_step = TuningStep(
name="TuneModel",
tuner=tuner,
inputs={
"training": feature_step.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
"validation": feature_step.properties.ProcessingOutputConfig.Outputs["val"].S3Output.S3Uri
}
)
# Step 4: Evaluate best model
eval_step = ProcessingStep(
name="EvaluateModel",
processor=eval_processor,
inputs=[
ProcessingInput(
source=tuning_step.properties.BestTrainingJob.ModelArtifacts.S3ModelArtifacts
),
ProcessingInput(
source=feature_step.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri
)
],
outputs=[ProcessingOutput(output_name="metrics", destination=Join(on="/", values=[output_path, "metrics"]))],
property_files=[
PropertyFile(name="EvalMetrics", output_name="metrics", path="evaluation.json")
]
)
# Step 5: Register if metrics good
register_step = ModelStep(
name="RegisterModel",
step_args=builder.register_args(
model_package_group_name="models",
approval_status="PendingManualApproval"
)
)
# Step 6: Deploy to staging
deploy_step = LambdaStep(
name="DeployToStaging",
lambda_func=deploy_lambda_arn,
inputs={
"model_package_arn": register_step.properties.ModelPackageArn,
"endpoint_name": "staging-endpoint"
}
)
# Conditional logic
accuracy = JsonGet(
step_name=eval_step.name,
property_file=eval_step.property_files[0],
json_path="metrics.accuracy"
)
condition = ConditionGreaterThan(left=accuracy, right=accuracy_threshold)
condition_step = ConditionStep(
name="CheckAccuracy",
conditions=[condition],
if_steps=[register_step, deploy_step],
else_steps=[FailStep(name="LowAccuracy", error_message="Model accuracy below threshold")]
)
# Create complete pipeline
pipeline = Pipeline(
name="production-ml-pipeline",
parameters=[data_path, accuracy_threshold],
steps=[
validate_step,
feature_step,
tuning_step,
eval_step,
condition_step
],
parallelism_config=ParallelismConfiguration(max_parallel_execution_steps=5),
tags=[{"Key": "Environment", "Value": "Production"}]
)
# Create and execute
pipeline.upsert(
role_arn=pipeline_role,
description="End-to-end ML pipeline with validation and conditional deployment"
)
execution = pipeline.start(
parameters={
"InputData": "s3://bucket/latest-data",
"AccuracyThreshold": 0.92
},
execution_display_name=f"Execution-{datetime.now().isoformat()}"
)
# Monitor execution
execution.wait(poll=30)
# Get step details
status = execution.describe()
for step in status['PipelineExecutionSteps']:
print(f"{step['StepName']}: {step['StepStatus']}")
if step.get('FailureReason'):
print(f" Failure: {step['FailureReason']}")# Create multiple independent preprocessing steps
preprocess_steps = []
for i, data_source in enumerate(data_sources):
step = ProcessingStep(
name=f"Preprocess-{i}",
processor=processor,
inputs=[ProcessingInput(source=data_source)],
outputs=[ProcessingOutput(
output_name="output",
destination=f"s3://bucket/processed-{i}"
)]
)
preprocess_steps.append(step)
# Training depends on all preprocessing completing
train_step = TrainingStep(
name="Train",
estimator=trainer,
depends_on=preprocess_steps # Waits for all
)
# All preprocessing steps run in parallel (up to parallelism limit)
pipeline = Pipeline(
name="parallel-preprocessing",
steps=preprocess_steps + [train_step],
parallelism_config=ParallelismConfiguration(
max_parallel_execution_steps=len(data_sources)
)
)Circular Dependency Detected:
Step Name Conflict:
Invalid Parameter Reference:
Property Not Available:
Cache Miss on Sensitive Data:
Execution Timeout: