or run

tessl search
Log in

Version

Files

docs

ai-registry.mdclarify.mddata-io.mddebugger.mdevaluation.mdexperiments.mdexplainer-config.mdindex.mdjumpstart.mdlineage.mdmlops.mdmonitoring.mdprocessing.mdremote-functions.mdresources.mds3-utilities.mdserving.mdtraining.mdworkflow-primitives.md
tile.json

workflow-primitives.mddocs/

Workflow Primitives

Pipeline workflow primitives including parameters, conditions, functions, properties, and execution variables for building dynamic ML workflows.

Capabilities

Parameters

Pipeline parameters allow passing values at execution time for dynamic workflows.

ParameterString

class ParameterString:
    """
    String parameter for pipelines.

    Parameters:
        name: str - Parameter name (required)
            - 1-256 characters
            - Must start with letter
            - Alphanumeric and underscores only
        default_value: Optional[str] - Default value
            - Maximum 1024 characters
            - Used if not provided at execution time
        enum_values: Optional[List[str]] - Allowed values
            - Constrains input to specific strings
            - Execution fails if value not in list

    Usage:
        Use as placeholder for string values that vary between executions.
        Common use cases: S3 paths, configuration values, resource names.
    
    Notes:
        - Parameters immutable during execution
        - Validation happens before pipeline starts
        - Can be used in step configurations and conditions
    """

ParameterInteger

class ParameterInteger:
    """
    Integer parameter for pipelines.

    Parameters:
        name: str - Parameter name (required)
        default_value: Optional[int] - Default value
        min_value: Optional[int] - Minimum allowed value (inclusive)
        max_value: Optional[int] - Maximum allowed value (inclusive)

    Usage:
        Use for integer values like instance counts, epochs, batch sizes.
    
    Notes:
        - Value must be integer (not float)
        - Range validation at execution start
        - Can be used in arithmetic expressions
    """

ParameterFloat

class ParameterFloat:
    """
    Float parameter for pipelines.

    Parameters:
        name: str - Parameter name (required)
        default_value: Optional[float] - Default value
        min_value: Optional[float] - Minimum allowed value (inclusive)
        max_value: Optional[float] - Maximum allowed value (inclusive)

    Usage:
        Use for floating-point values like learning rates, thresholds, ratios.
    
    Notes:
        - Accepts float or integer (converted to float)
        - Range validation at execution start
        - Can be used in comparisons
    """

ParameterBoolean

class ParameterBoolean:
    """
    Boolean parameter for pipelines.

    Parameters:
        name: str - Parameter name (required)
        default_value: Optional[bool] - Default value

    Usage:
        Use for boolean flags like enable/disable features, debug modes.
    
    Notes:
        - Accepts: true/false, True/False, 1/0
        - Coerced to boolean
        - Can be used in conditional logic
    """

Usage:

from sagemaker.core.workflow import ParameterString, ParameterInteger, ParameterFloat, ParameterBoolean
from sagemaker.mlops.workflow import Pipeline, TrainingStep

# Define parameters with validation
instance_type = ParameterString(
    name="TrainingInstanceType",
    default_value="ml.m5.xlarge",
    enum_values=["ml.m5.xlarge", "ml.m5.2xlarge", "ml.p3.2xlarge", "ml.p3.8xlarge"]
)

instance_count = ParameterInteger(
    name="InstanceCount",
    default_value=1,
    min_value=1,
    max_value=10
)

learning_rate = ParameterFloat(
    name="LearningRate",
    default_value=0.001,
    min_value=0.0001,
    max_value=0.1
)

enable_spot = ParameterBoolean(
    name="EnableSpotInstances",
    default_value=True
)

data_path = ParameterString(
    name="InputDataPath",
    default_value="s3://my-bucket/default-data"
)

# Use parameters in pipeline
trainer = ModelTrainer(
    compute=Compute(
        instance_type=instance_type,
        instance_count=instance_count,
        enable_managed_spot_training=enable_spot
    ),
    hyperparameters={
        "learning_rate": learning_rate
    }
)

train_step = TrainingStep(
    name="Train",
    estimator=trainer,
    inputs={"training": data_path}
)

# Create parameterized pipeline
pipeline = Pipeline(
    name="parameterized-pipeline",
    parameters=[instance_type, instance_count, learning_rate, enable_spot, data_path],
    steps=[train_step]
)

pipeline.create(role_arn=role)

# Execute with custom values
try:
    execution = pipeline.start(
        parameters={
            "TrainingInstanceType": "ml.p3.8xlarge",
            "InstanceCount": 4,
            "LearningRate": 0.01,
            "EnableSpotInstances": False,
            "InputDataPath": "s3://my-bucket/experiment-1/data"
        }
    )
except ValueError as e:
    print(f"Invalid parameter value: {e}")
    # Example: "TrainingInstanceType value 'ml.invalid' not in enum_values"

# Execute with defaults
execution = pipeline.start()  # Uses default values

Conditions

Conditional expressions for branching logic in pipelines.

ConditionEquals

class ConditionEquals:
    """
    Equality comparison condition.

    Parameters:
        left: Union[PipelineVariable, Any] - Left operand (required)
        right: Union[PipelineVariable, Any] - Right operand (required)

    Returns:
        Condition: Boolean condition evaluating left == right

    Supported Types:
        - String, Integer, Float comparisons
        - Pipeline variables (parameters, properties)
        - Literal values

    Notes:
        - Type coercion for numeric comparisons
        - String comparison case-sensitive
    """

ConditionGreaterThan

class ConditionGreaterThan:
    """
    Greater than comparison.

    Parameters:
        left: Union[PipelineVariable, Any] - Left operand (required)
        right: Union[PipelineVariable, Any] - Right operand (required)

    Returns:
        Condition: Boolean condition evaluating left > right

    Notes:
        - Numeric comparison only
        - Both operands must be comparable types
    """

ConditionGreaterThanOrEqualTo

class ConditionGreaterThanOrEqualTo:
    """
    Greater than or equal comparison.

    Parameters:
        left: Union[PipelineVariable, Any] - Left operand (required)
        right: Union[PipelineVariable, Any] - Right operand (required)

    Returns:
        Condition: Boolean condition evaluating left >= right
    """

ConditionLessThan

class ConditionLessThan:
    """
    Less than comparison.

    Parameters:
        left: Union[PipelineVariable, Any] - Left operand (required)
        right: Union[PipelineVariable, Any] - Right operand (required)

    Returns:
        Condition: Boolean condition evaluating left < right
    """

ConditionLessThanOrEqualTo

class ConditionLessThanOrEqualTo:
    """
    Less than or equal comparison.

    Parameters:
        left: Union[PipelineVariable, Any] - Left operand (required)
        right: Union[PipelineVariable, Any] - Right operand (required)

    Returns:
        Condition: Boolean condition evaluating left <= right
    """

ConditionIn

class ConditionIn:
    """
    Membership check condition.

    Parameters:
        value: Union[PipelineVariable, Any] - Value to check (required)
        in_values: List[Union[PipelineVariable, Any]] - List to check against (required)

    Returns:
        Condition: Boolean condition evaluating value in in_values

    Notes:
        - in_values can be literal list or pipeline variable
        - Type-safe comparison
    """

ConditionNot

class ConditionNot:
    """
    Logical NOT condition.

    Parameters:
        condition: Condition - Condition to negate (required)

    Returns:
        Condition: Boolean condition evaluating NOT condition
    """

ConditionOr

class ConditionOr:
    """
    Logical OR condition.

    Parameters:
        conditions: List[Condition] - Conditions to combine with OR (required)
            - Minimum 2 conditions

    Returns:
        Condition: Boolean condition evaluating conditions[0] OR conditions[1] OR ...

    Notes:
        - Short-circuit evaluation: stops at first True
        - All conditions must be valid Condition objects
    """

Usage:

from sagemaker.core.workflow import (
    ConditionGreaterThan, ConditionEquals, ConditionOr, ConditionNot,
    ConditionIn, ParameterFloat, ParameterString
)
from sagemaker.mlops.workflow import ConditionStep, FailStep

# Define parameters
accuracy_threshold = ParameterFloat(name="AccuracyThreshold", default_value=0.9)
deployment_env = ParameterString(name="Environment", default_value="staging")

# Simple condition
high_accuracy = ConditionGreaterThan(
    left=train_step.properties.FinalMetricDataList["validation:accuracy"].Value,
    right=accuracy_threshold
)

# Multiple conditions with OR
success_statuses = ["Completed", "CompletedWithViolations"]
processing_ok = ConditionIn(
    value=process_step.properties.ProcessingJobStatus,
    in_values=success_statuses
)

# Combine with OR
either_condition = ConditionOr(
    conditions=[high_accuracy, processing_ok]
)

# Negate condition
low_accuracy = ConditionNot(condition=high_accuracy)

# Complex nested conditions
production_ready = ConditionOr(
    conditions=[
        ConditionGreaterThan(
            left=train_step.properties.FinalMetricDataList["accuracy"].Value,
            right=0.95
        ),
        ConditionOr(conditions=[
            ConditionEquals(left=deployment_env, right="staging"),
            ConditionEquals(left=deployment_env, right="dev")
        ])
    ]
)

# Use in conditional step
condition_step = ConditionStep(
    name="CheckDeploymentReady",
    conditions=[production_ready],
    if_steps=[deploy_step],
    else_steps=[FailStep(name="NotReady", error_message="Model not ready for deployment")]
)

Functions

Utility functions for manipulating pipeline variables.

Join

class Join:
    """
    Join pipeline variables into a string.

    Parameters:
        on: str - Separator string (required)
        values: List[Union[PipelineVariable, Any]] - Values to join (required)
            - Pipeline variables (parameters, properties, execution variables)
            - Literal strings, numbers
            - Converted to strings and joined

    Methods:
        to_string() -> str
            Convert to JSON string representation for API.
            
            Returns:
                str: JSON representation of join operation

    Notes:
        - All values converted to strings
        - Empty strings and None skipped
        - Common uses: building S3 paths, job names, tags
    """

Usage:

from sagemaker.core.workflow import Join, ExecutionVariables, ParameterString

# Join execution variables for S3 path
base_bucket = ParameterString(name="BaseBucket", default_value="my-ml-bucket")

output_path = Join(
    on="/",
    values=[
        "s3://",
        base_bucket,
        "pipelines",
        ExecutionVariables.PIPELINE_NAME,
        ExecutionVariables.PIPELINE_EXECUTION_ID
    ]
)
# Result: "s3://my-ml-bucket/pipelines/my-pipeline/execution-abc-123"

# Build job name
job_name = Join(
    on="-",
    values=[
        "training",
        ExecutionVariables.PIPELINE_NAME,
        ExecutionVariables.START_DATETIME
    ]
)
# Result: "training-my-pipeline-2024-01-15T10:30:00Z"

# Build dynamic path from step output
processed_path = Join(
    on="/",
    values=[
        "s3://bucket",
        "processed",
        train_step.properties.TrainingJobName,
        "data"
    ]
)

# Use in step configuration
process_step = ProcessingStep(
    name="Process",
    processor=processor,
    outputs=[
        ProcessingOutput(
            source="/opt/ml/processing/output",
            destination=output_path  # Dynamic path
        )
    ]
)

JsonGet

class JsonGet:
    """
    Extract JSON properties from step outputs or S3 files.

    Parameters:
        step_name: str - Step name containing JSON (required)
        property_file: Union[str, PropertyFile] - Property file reference (required)
        json_path: str - JSON path expression (required)
            - Dot notation: "metrics.accuracy"
            - Array access: "results[0].score"
            - Nested: "model.performance.validation.accuracy"

    Methods:
        to_string() -> str
            Convert to JSON string representation.
            
            Returns:
                str: JSON representation of JsonGet operation

    Notes:
        - Step must define property_files
        - JSON file must exist in step output
        - Path must match JSON structure
        - Returns typed value (string, number, boolean, null)
    
    Raises:
        ValueError: If JSON path invalid or value not found
    """

Usage:

from sagemaker.core.workflow import JsonGet, PropertyFile
from sagemaker.mlops.workflow import ProcessingStep, ConditionStep
from sagemaker.core.workflow import ConditionGreaterThan

# Define property file in processing step
evaluation_report = PropertyFile(
    name="EvaluationReport",
    output_name="evaluation",
    path="evaluation.json"  # Relative to output source
)

# Processing step that generates JSON
evaluate_step = ProcessingStep(
    name="EvaluateModel",
    processor=evaluator_processor,
    inputs=[
        ProcessingInput(
            source=train_step.properties.ModelArtifacts.S3ModelArtifacts
        )
    ],
    outputs=[
        ProcessingOutput(
            output_name="evaluation",
            source="/opt/ml/processing/evaluation",
            destination="s3://bucket/evaluation"
        )
    ],
    property_files=[evaluation_report]
)

# Extract values from JSON output
# evaluation.json structure:
# {
#   "metrics": {
#     "accuracy": 0.94,
#     "f1": 0.92,
#     "precision": 0.93
#   },
#   "confusion_matrix": [...],
#   "metadata": {
#     "samples": 1000,
#     "classes": 5
#   }
# }

model_accuracy = JsonGet(
    step_name=evaluate_step.name,
    property_file=evaluation_report,
    json_path="metrics.accuracy"
)

model_f1 = JsonGet(
    step_name=evaluate_step.name,
    property_file=evaluation_report,
    json_path="metrics.f1"
)

sample_count = JsonGet(
    step_name=evaluate_step.name,
    property_file=evaluation_report,
    json_path="metadata.samples"
)

# Use extracted values in conditions
accuracy_condition = ConditionGreaterThan(
    left=model_accuracy,
    right=0.9
)

f1_condition = ConditionGreaterThan(
    left=model_f1,
    right=0.85
)

# Deploy only if both metrics good
condition_step = ConditionStep(
    name="CheckMetrics",
    conditions=[accuracy_condition, f1_condition],  # AND logic
    if_steps=[register_model_step, deploy_step],
    else_steps=[retrain_step]
)

Multiple Property Files:

# Multiple JSON outputs from single step
train_metrics = PropertyFile(
    name="TrainMetrics",
    output_name="metrics",
    path="train_metrics.json"
)

val_metrics = PropertyFile(
    name="ValMetrics",
    output_name="metrics",
    path="val_metrics.json"
)

test_metrics = PropertyFile(
    name="TestMetrics",
    output_name="metrics",
    path="test_metrics.json"
)

evaluate_step = ProcessingStep(
    name="Evaluate",
    processor=processor,
    outputs=[
        ProcessingOutput(
            output_name="metrics",
            source="/opt/ml/processing/metrics",
            destination="s3://bucket/metrics"
        )
    ],
    property_files=[train_metrics, val_metrics, test_metrics]
)

# Extract from different files
train_acc = JsonGet(
    step_name=evaluate_step.name,
    property_file=train_metrics,
    json_path="accuracy"
)

val_acc = JsonGet(
    step_name=evaluate_step.name,
    property_file=val_metrics,
    json_path="accuracy"
)

test_acc = JsonGet(
    step_name=evaluate_step.name,
    property_file=test_metrics,
    json_path="accuracy"
)

# Check for overfitting: train_acc - val_acc < 0.05
overfitting_check = ConditionLessThan(
    left=Join(on="", values=[train_acc, "-", val_acc]),  # String arithmetic
    right="0.05"
)

Properties

Access step outputs and metadata through property references.

Properties

class Properties:
    """
    Properties for workflow expressions.

    Provides access to step output properties:
    
    Training Step Properties:
        - TrainingJobName: str - Training job name
        - ModelArtifacts.S3ModelArtifacts: str - Model S3 URI
        - FinalMetricDataList[metric_name].Value: float - Metric values
        - TrainingStartTime: datetime - Start time
        - TrainingEndTime: datetime - End time
        - TrainingJobStatus: str - Job status
    
    Processing Step Properties:
        - ProcessingJobName: str - Processing job name
        - ProcessingJobStatus: str - Job status
        - ProcessingOutputConfig.Outputs[output_name].S3Output.S3Uri: str - Output URIs
    
    Transform Step Properties:
        - TransformJobName: str - Transform job name
        - TransformOutput.S3OutputPath: str - Output path
    
    Tuning Step Properties:
        - HyperParameterTuningJobName: str - Tuning job name
        - BestTrainingJob.TrainingJobName: str - Best job name
        - BestTrainingJob.ModelArtifacts.S3ModelArtifacts: str - Best model URI
        - BestTrainingJob.FinalMetricDataList[metric_name].Value: float - Best metrics

    Access pattern:
        step.properties.<PropertyPath>

    Notes:
        - Properties are lazy - not resolved until execution
        - Type-safe property access
        - Can be used in conditions, parameters, and step inputs
        - Creates implicit dependencies between steps
    """

PropertiesList

class PropertiesList:
    """
    List of properties for workflow expressions.

    Provides indexing access to list-type properties.

    Usage:
        Access list elements using bracket notation with integer index.
    
    Example:
        step.properties.FinalMetricDataList[0].MetricName
        step.properties.ProductionVariants[1].InstanceType

    Notes:
        - Zero-indexed
        - Out of bounds raises error at execution time
        - Can iterate with known indices
    """

PropertiesMap

class PropertiesMap:
    """
    Map/dictionary of properties for workflow expressions.

    Provides key-based access to map-type properties.

    Usage:
        Access map values using bracket notation with string keys.
    
    Example:
        step.properties.FinalMetricDataList["accuracy"].Value
        step.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri

    Notes:
        - String keys only
        - Key must exist (error at execution if missing)
        - Case-sensitive key matching
    """

PropertyFile

class PropertyFile:
    """
    Property file struct for JSON outputs.

    Parameters:
        name: str - Property file name (required)
            - Used to reference in JsonGet
        output_name: str - Processing output name (required)
            - Must match ProcessingOutput output_name
        path: str - Path to JSON file within output (required)
            - Relative to ProcessingOutput source
            - Example: "metrics/evaluation.json"

    Usage:
        Define JSON files from processing steps for use with JsonGet.
        Enables conditional logic based on processing results.

    Notes:
        - File must be valid JSON
        - Maximum file size: 100 MB
        - Multiple property files per step allowed
    """

StepOutput

class StepOutput:
    """
    Reference to a step output value.

    Represents the output of a pipeline step for use in downstream steps.

    Usage:
        Automatically created when accessing step.properties.
        Creates implicit dependency from downstream to upstream step.

    Notes:
        - Not directly instantiated
        - Part of property resolution system
        - Ensures correct execution order
    """

get_step

def get_step(step_name: str, pipeline) -> Step:
    """
    Get a step from a pipeline by name.

    Parameters:
        step_name: str - Name of the step (required)
        pipeline: Pipeline - Pipeline containing the step (required)

    Returns:
        Step: The step with the given name

    Raises:
        ValueError: If step not found in pipeline

    Notes:
        - Helper for accessing steps dynamically
        - Useful in complex pipeline construction
    """

Properties Usage Examples:

# Access training job properties
model_s3_uri = train_step.properties.ModelArtifacts.S3ModelArtifacts
job_name = train_step.properties.TrainingJobName
training_time = train_step.properties.TrainingEndTime

# Access training metrics
accuracy = train_step.properties.FinalMetricDataList["validation:accuracy"].Value
loss = train_step.properties.FinalMetricDataList["loss"].Value
f1_score = train_step.properties.FinalMetricDataList["f1"].Value

# Access processing outputs
train_data_uri = process_step.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri
val_data_uri = process_step.properties.ProcessingOutputConfig.Outputs["validation"].S3Output.S3Uri

# Access tuning results
best_model = tuning_step.properties.BestTrainingJob.ModelArtifacts.S3ModelArtifacts
best_job_name = tuning_step.properties.BestTrainingJob.TrainingJobName

# Use properties in subsequent steps
transform_step = TransformStep(
    name="BatchInference",
    transformer=transformer,
    inputs=TransformInput(
        data=process_step.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
        content_type="application/json"
    )
)

# Register best model from tuning
register_step = ModelStep(
    name="RegisterBestModel",
    step_args={
        "model_data": tuning_step.properties.BestTrainingJob.ModelArtifacts.S3ModelArtifacts
    }
)

Execution Variables

Built-in pipeline execution context variables.

ExecutionVariables

class ExecutionVariables:
    """
    Access to pipeline execution variables.

    Constants:
        START_DATETIME: str
            Pipeline execution start time (ISO 8601 format)
            Example: "2024-01-15T10:30:00Z"
        
        CURRENT_DATETIME: str
            Current datetime (evaluation time)
            Updates during execution
        
        PIPELINE_NAME: str
            Pipeline name
            Example: "my-training-pipeline"
        
        PIPELINE_ARN: str
            Pipeline ARN
            Format: "arn:aws:sagemaker:region:account:pipeline/name"
        
        PIPELINE_EXECUTION_ID: str
            Execution ID (unique per execution)
            Format: "abc123-def456-ghi789"
        
        PIPELINE_EXECUTION_ARN: str
            Execution ARN
            Format: "arn:aws:sagemaker:region:account:pipeline/name/execution/id"
        
        TRAINING_JOB_NAME: str
            Current training job name (within training step)
        
        PROCESSING_JOB_NAME: str
            Current processing job name (within processing step)

    Usage:
        Use these for dynamic naming, paths, metadata, and tracking.
        Available throughout pipeline execution.

    Notes:
        - All variables resolved at execution time
        - PIPELINE_EXECUTION_ID unique per execution (use for idempotency)
        - Useful for organizing outputs by execution
        - Can be used in Join, parameters, tags
    """

Usage:

from sagemaker.core.workflow import ExecutionVariables, Join

# Dynamic S3 paths using execution variables
base_uri = "s3://my-ml-bucket"

# Organized by pipeline and execution
output_path = Join(
    on="/",
    values=[
        base_uri,
        "outputs",
        ExecutionVariables.PIPELINE_NAME,
        ExecutionVariables.PIPELINE_EXECUTION_ID
    ]
)
# Result: "s3://my-ml-bucket/outputs/my-pipeline/exec-abc-123"

# Organized by date
dated_path = Join(
    on="/",
    values=[
        base_uri,
        "daily-runs",
        ExecutionVariables.START_DATETIME,  # Includes date/time
        ExecutionVariables.PIPELINE_NAME
    ]
)

# Dynamic job naming for uniqueness
training_job_name = Join(
    on="-",
    values=[
        "training",
        ExecutionVariables.PIPELINE_NAME,
        ExecutionVariables.PIPELINE_EXECUTION_ID
    ]
)

# Tag resources with execution metadata
tags = [
    {"Key": "Pipeline", "Value": ExecutionVariables.PIPELINE_NAME},
    {"Key": "ExecutionId", "Value": ExecutionVariables.PIPELINE_EXECUTION_ID},
    {"Key": "StartTime", "Value": ExecutionVariables.START_DATETIME},
    {"Key": "Project", "Value": "CustomerChurn"}
]

# Use in step configuration
train_step = TrainingStep(
    name="Train",
    estimator=trainer,
    # Job name automatically unique per execution
    # Outputs organized by execution
)

Execution Context in Processing:

# Processing script can access execution context
# process.py

import os
import json

def main():
    # Execution variables available as environment variables
    pipeline_name = os.environ.get('PIPELINE_NAME', 'unknown')
    execution_id = os.environ.get('PIPELINE_EXECUTION_ID', 'unknown')
    
    print(f"Running in pipeline: {pipeline_name}")
    print(f"Execution ID: {execution_id}")
    
    # Use for output organization
    output_dir = f"/opt/ml/processing/output/{execution_id}"
    os.makedirs(output_dir, exist_ok=True)
    
    # Save execution metadata
    metadata = {
        "pipeline_name": pipeline_name,
        "execution_id": execution_id,
        "processing_time": datetime.now().isoformat()
    }
    
    with open(f"{output_dir}/metadata.json", 'w') as f:
        json.dump(metadata, f)

if __name__ == '__main__':
    main()

Advanced Usage

Complex Conditional Logic

from sagemaker.core.workflow import (
    ConditionGreaterThan, ConditionLessThan,
    ConditionEquals, ConditionOr, ConditionNot
)

# Multiple thresholds
high_accuracy = ConditionGreaterThan(
    left=train_step.properties.FinalMetricDataList["accuracy"].Value,
    right=0.90
)

low_loss = ConditionLessThan(
    left=train_step.properties.FinalMetricDataList["loss"].Value,
    right=0.15
)

no_overfitting = ConditionLessThan(
    left=Join(on="", values=[
        train_step.properties.FinalMetricDataList["train_acc"].Value,
        "-",
        train_step.properties.FinalMetricDataList["val_acc"].Value
    ]),
    right="0.05"  # Max 5% gap
)

# Combine multiple conditions
model_quality = ConditionOr(
    conditions=[
        high_accuracy,
        low_loss
    ]
)

model_not_overfit = ConditionNot(
    condition=ConditionGreaterThan(
        left=train_step.properties.FinalMetricDataList["train_acc"].Value,
        right=train_step.properties.FinalMetricDataList["val_acc"].Value + 0.1
    )
)

# Final deployment condition
deploy_condition = ConditionOr(
    conditions=[
        model_quality,
        model_not_overfit
    ]
)

condition_step = ConditionStep(
    name="CheckQuality",
    conditions=[deploy_condition],
    if_steps=[register_step, deploy_step],
    else_steps=[notify_step, retrain_step]
)

Dynamic Resource Configuration

from sagemaker.core.workflow import ParameterString, ParameterInteger

# Parameters for dynamic resource allocation
data_size = ParameterString(
    name="DataSize",
    default_value="small",
    enum_values=["small", "medium", "large", "xlarge"]
)

# Map data size to resources (in Lambda step)
resource_mapper = LambdaStep(
    name="MapResources",
    lambda_func=resource_mapping_lambda,
    inputs={"data_size": data_size},
    outputs=[
        LambdaOutput(output_name="instance_type", output_type="String"),
        LambdaOutput(output_name="instance_count", output_type="Integer")
    ]
)

# Use mapped resources in training
trainer = ModelTrainer(
    compute=Compute(
        instance_type=resource_mapper.properties.Outputs["instance_type"],
        instance_count=resource_mapper.properties.Outputs["instance_count"]
    )
)

Helper Functions

is_pipeline_variable

def is_pipeline_variable(value: Any) -> bool:
    """
    Check if a value is a pipeline variable.

    Parameters:
        value: Any - Value to check

    Returns:
        bool: True if value is a pipeline variable (Parameter, Property, ExecutionVariable)

    Usage:
        Determine if special handling needed for pipeline variables.
    
    Example:
        if is_pipeline_variable(instance_type):
            # Will be resolved at execution time
        else:
            # Literal value, resolved now
    """

is_pipeline_parameter_string

def is_pipeline_parameter_string(value: Any) -> bool:
    """
    Check if a value is a parameter string.

    Parameters:
        value: Any - Value to check

    Returns:
        bool: True if value is a ParameterString instance

    Usage:
        Check if value is string parameter for specialized handling.
    """

Usage:

from sagemaker.core.workflow import (
    is_pipeline_variable,
    is_pipeline_parameter_string,
    ParameterString
)

instance_type_param = ParameterString(name="InstanceType")
literal_instance = "ml.m5.xlarge"

# Check type for conditional logic
def configure_training(instance_type):
    if is_pipeline_variable(instance_type):
        print("Instance type will be determined at execution time")
        # Cannot validate now
        return ModelTrainer(compute=Compute(instance_type=instance_type))
    else:
        print(f"Using literal instance type: {instance_type}")
        # Can validate immediately
        validate_instance_type(instance_type)
        return ModelTrainer(compute=Compute(instance_type=instance_type))

trainer1 = configure_training(instance_type_param)  # Pipeline variable
trainer2 = configure_training(literal_instance)  # Literal

# Parameter string check
if is_pipeline_parameter_string(instance_type_param):
    print("This is a pipeline parameter")

Validation and Constraints

Parameter Constraints

  • Parameter name: 1-256 characters, start with letter, alphanumeric and underscores
  • Maximum parameters: 200 per pipeline
  • String default value: Maximum 1024 characters
  • Enum values: Maximum 100 values
  • Integer range: -2^31 to 2^31-1
  • Float precision: Double precision (IEEE 754)

Property Constraints

  • Property path depth: Maximum 10 levels
  • Array index: Must be known at definition time (cannot be dynamic)
  • Map key: Must be string literal (not dynamic)
  • Property file size: Maximum 100 MB

Condition Constraints

  • Maximum conditions per ConditionStep: 10
  • Maximum nesting depth: 5 levels
  • ConditionOr conditions: Minimum 2, maximum 10

Common Error Scenarios

  1. Parameter Type Mismatch:

    • Cause: Passing string to IntegerParameter
    • Solution: Ensure type matches parameter definition
  2. Property Path Invalid:

    • Cause: Accessing non-existent property
    • Solution: Check step output schema, verify property exists
  3. Circular Dependency:

    • Cause: Step A uses B's property, B uses A's property
    • Solution: Remove circular reference, restructure pipeline
  4. Property File Not Found:

    • Cause: Processing step didn't create expected JSON file
    • Solution: Verify processing code creates file at correct path
  5. JSON Path Error:

    • Cause: json_path doesn't match JSON structure
    • Solution: Verify JSON structure, test path with sample data
  6. Condition Type Error:

    • Cause: Comparing incompatible types
    • Solution: Ensure both operands have compatible types