Unified interface for model deployment and inference with support for multiple frameworks, model servers, and deployment modes.
Important: The sagemaker.serve module only exports 3 classes from __init__.py:
ModelBuilderInferenceSpecModelServerOther classes must be imported using their full module paths:
# Main exports (available from sagemaker.serve)
from sagemaker.serve import ModelBuilder, InferenceSpec, ModelServer
# Requires full module path
from sagemaker.serve.builder.schema_builder import SchemaBuilder
from sagemaker.serve.utils.payload_translator import CustomPayloadTranslator
from sagemaker.serve.model_server.multi_model_server.utils import LocalEndpoint
from sagemaker.serve.utils.predictors import AsyncPredictor
from sagemaker.serve.utils.async_inference import AsyncInferenceResponse, WaiterConfig
from sagemaker.serve.builder.bedrock_builder import BedrockModelBuilderMain class for building and deploying models to SageMaker, replacing V2 Model and framework-specific classes.
class ModelBuilder:
"""
Unified interface for building and deploying models.
Parameters:
model: Optional[Union[object, str, ModelTrainer, TrainingJob, ModelPackage, List[Model]]]
- Model object (PyTorch, TensorFlow, sklearn, etc.)
- JumpStart model ID (e.g., "meta-llama/Llama-2-7b-hf")
- ModelTrainer instance (from training)
- TrainingJob instance (from completed training)
- ModelPackage from Model Registry
- List[Model] for multi-model endpoints
model_path: Optional[str] - Local directory or S3 URI for model artifacts
- Local: "./model" (contents tarred and uploaded)
- S3: "s3://bucket/model.tar.gz"
- Must contain model files loadable by inference code
inference_spec: Optional[InferenceSpec] - Custom inference specification
- Defines model loading and inference logic
- Required for custom model types
schema_builder: Optional[SchemaBuilder] - Input/output schema builder
- Auto-detects serializers/deserializers
- Provide sample input/output for detection
role_arn: Optional[str] - IAM role ARN (required for deployment)
- Needs: sagemaker:CreateModel, sagemaker:CreateEndpoint
- Plus: ecr:GetAuthorizationToken, ecr:BatchGetImage
sagemaker_session: Optional[Session] - SageMaker session
image_uri: Optional[str] - Container image URI
- Auto-detected if not provided
- Required for custom inference containers
s3_model_data_url: Optional[Union[str, Dict]] - S3 URI for model artifacts
- String: single model
- Dict: multi-model with {"model_name": "s3://..."}
source_code: Optional[SourceCode] - Source code configuration
- inference.py for custom inference logic
- requirements.txt for dependencies
env_vars: Optional[Dict[str, str]] - Environment variables
- Passed to inference container
- Example: {"MODEL_NAME": "my-model"}
model_server: Optional[ModelServer] - Model server type
- Auto-detected if not specified
- Options: TORCHSERVE, MMS, TENSORFLOW_SERVING, TRITON, TGI, TEI
model_metadata: Optional[Dict[str, Any]] - Model metadata overrides
- Framework, version, etc.
compute: Optional[Compute] - Compute configuration
network: Optional[Networking] - Network configuration
instance_type: Optional[str] - EC2 instance type
- Inference: ml.t2.medium, ml.m5.xlarge, ml.g4dn.xlarge, etc.
mode: Optional[Mode] - Deployment mode
- Mode.SAGEMAKER_ENDPOINT: Deploy to SageMaker (default)
- Mode.LOCAL_CONTAINER: Deploy locally in Docker
- Mode.IN_PROCESS: Run in current Python process
log_level: Optional[int] - Logging level
- logging.DEBUG, logging.INFO, etc.
content_type: Optional[str] - Input MIME type
- Example: "application/json", "text/csv"
accept_type: Optional[str] - Output MIME type
- Example: "application/json"
Methods:
build(model_name=None, mode=None, role_arn=None, sagemaker_session=None, region=None) -> Model
Create SageMaker Model resource.
Parameters:
model_name: Optional[str] - Model name (default: auto-generated)
mode: Optional[Mode] - Override constructor mode
role_arn: Optional[str] - Override constructor role
sagemaker_session: Optional[Session] - Override session
region: Optional[str] - AWS region
Returns:
Model: SageMaker Model resource
Raises:
ValueError: Missing required parameters or invalid configuration
ClientError: AWS API errors
deploy(endpoint_name, initial_instance_count=None, instance_type=None, wait=True,
update_endpoint=False, container_timeout_in_seconds=None, inference_config=None,
**kwargs) -> Union[Endpoint, LocalEndpoint]
Deploy model to endpoint.
Parameters:
endpoint_name: str - Endpoint name (required)
initial_instance_count: Optional[int] - Number of instances (default: 1)
instance_type: Optional[str] - Instance type (default: from constructor)
wait: bool - Block until endpoint in service (default: True)
update_endpoint: bool - Update existing endpoint (default: False)
container_timeout_in_seconds: Optional[int] - Container timeout (default: 3600)
inference_config: Optional - Serverless, async, or batch config
**kwargs: Additional endpoint configuration
Returns:
Endpoint: Deployed endpoint (SageMaker or Local)
Raises:
ValueError: Invalid configuration
ClientError: Deployment errors
deploy_local(endpoint_name, container_timeout_in_seconds=None, **kwargs) -> LocalEndpoint
Deploy to local Docker container for testing.
Parameters:
endpoint_name: str - Endpoint name
container_timeout_in_seconds: Optional[int] - Timeout
**kwargs: Additional configuration
Returns:
LocalEndpoint: Local endpoint for testing
Raises:
RuntimeError: If Docker not available
optimize(model_name, output_path, instance_type, quantization_config=None,
compilation_config=None, sharding_config=None, **kwargs) -> Model
Optimize model for inference.
Parameters:
model_name: str - Optimized model name
output_path: str - S3 path for optimized artifacts
instance_type: str - Target instance type
quantization_config: Optional[QuantizationConfig] - Quantization settings
compilation_config: Optional[CompilationConfig] - Compilation settings
sharding_config: Optional[ShardingConfig] - Sharding for large models
**kwargs: Additional optimization parameters
Returns:
Model: Optimized model resource
Raises:
ValueError: Invalid optimization configuration
transformer(instance_count, instance_type, strategy=None, output_path=None,
**kwargs) -> Transformer
Create batch transformer for offline inference.
Parameters:
instance_count: int - Number of instances
instance_type: str - Instance type
strategy: Optional[str] - "MultiRecord" or "SingleRecord"
output_path: Optional[str] - S3 output path
**kwargs: Additional transformer configuration
Returns:
Transformer: Batch transform job manager
register(model_package_name=None, model_package_group_name=None,
content_types=None, response_types=None, inference_instances=None,
approval_status="PendingManualApproval", **kwargs) -> Union[ModelPackage, ModelPackageGroup]
Register model to Model Registry.
Parameters:
model_package_name: Optional[str] - Package name
model_package_group_name: Optional[str] - Package group name
content_types: Optional[List[str]] - Supported content types
response_types: Optional[List[str]] - Supported response types
inference_instances: Optional[List[str]] - Supported instance types
approval_status: str - Approval status
**kwargs: Additional registration parameters
Returns:
ModelPackage or ModelPackageGroup: Registered model
right_size(sample_payload_url=None, supported_content_types=None,
supported_instance_types=None, job_duration_in_seconds=None,
traffic_pattern=None, **kwargs) -> Dict
Get inference recommendations for optimal instance selection.
Parameters:
sample_payload_url: Optional[str] - S3 URI with sample payloads
supported_content_types: Optional[List[str]] - Content types to test
supported_instance_types: Optional[List[str]] - Instance types to test
job_duration_in_seconds: Optional[int] - Benchmark duration
traffic_pattern: Optional[List[Phase]] - Traffic pattern for advanced recommendations
**kwargs: Additional configuration
Returns:
Dict: Recommendations with instance types, costs, and latencies
set_deployment_config(config_name, instance_type) -> None
Set specific deployment configuration.
Parameters:
config_name: str - Configuration name
instance_type: str - Instance type for config
get_deployment_config() -> Dict
Get current deployment configuration.
Returns:
Dict: Current deployment settings
list_deployment_configs() -> List[str]
List available deployment configurations.
Returns:
List[str]: Available configuration names
Class Methods:
from_jumpstart_config(jumpstart_config, role_arn, compute=None, network=None,
image_uri=None, env_vars=None, model_kms_key=None,
resource_requirements=None, tolerate_vulnerable_model=False,
tolerate_deprecated_model=False, sagemaker_session=None,
schema_builder=None) -> ModelBuilder
Create ModelBuilder from JumpStart configuration.
Parameters:
jumpstart_config: JumpStartConfig - JumpStart model config (required)
role_arn: str - IAM role ARN (required)
... (other parameters override defaults from JumpStart)
Returns:
ModelBuilder: Configured builder for JumpStart model
Raises:
ValueError: Invalid or unavailable JumpStart model
Returns:
build() -> Model
deploy() -> Endpoint | LocalEndpoint | Transformer
optimize() -> Model
transformer() -> Transformer
register() -> ModelPackage | ModelPackageGroup
Raises:
ValueError: Invalid configuration or missing required parameters
ClientError: AWS API errors
RuntimeError: Deployment or build errors
Notes:
- Auto-detects framework, model server, and serialization
- Supports: PyTorch, TensorFlow, XGBoost, sklearn, HuggingFace, custom
- Model servers: TorchServe, TensorFlow Serving, Triton, TGI, TEI, MMS
- Test locally with Mode.LOCAL_CONTAINER before production deployment
- Use schema_builder for automatic serialization handling
"""Basic Usage:
from sagemaker.serve import ModelBuilder
# Build and deploy a PyTorch model
builder = ModelBuilder(
model="my-model", # Can be model object or path
model_path="s3://my-bucket/model.tar.gz",
role_arn="arn:aws:iam::123456789012:role/SageMakerRole",
instance_type="ml.m5.xlarge"
)
# Build model resource
model = builder.build()
print(f"Model created: {model.model_name}")
# Deploy to endpoint
endpoint = builder.deploy(
endpoint_name="my-endpoint",
initial_instance_count=1,
instance_type="ml.m5.xlarge",
wait=True
)
# Invoke endpoint
result = endpoint.invoke(data=input_data)
print(f"Prediction: {result}")
# Clean up
endpoint.delete()With Schema Builder:
from sagemaker.serve import ModelBuilder
from sagemaker.serve.builder.schema_builder import SchemaBuilder
import numpy as np
# Create sample input/output
sample_input = np.array([[1.0, 2.0, 3.0, 4.0]])
sample_output = np.array([0.85])
# Auto-detect serialization
schema_builder = SchemaBuilder(
sample_input=sample_input,
sample_output=sample_output
)
# Build with schema
builder = ModelBuilder(
model=my_trained_model,
schema_builder=schema_builder,
role_arn=role_arn
)
# Deploy - automatically handles numpy serialization
endpoint = builder.deploy(endpoint_name="auto-serialized-endpoint")
# Input/output automatically serialized
result = endpoint.invoke(data=np.array([[2.0, 3.0, 4.0, 5.0]]))From Training Job:
from sagemaker.serve import ModelBuilder
from sagemaker.train import ModelTrainer
# Train model
trainer = ModelTrainer(...)
trainer.train(input_data_config=[train_data])
# Deploy directly from training job
builder = ModelBuilder(
model=trainer._latest_training_job, # Use training job output
role_arn=role_arn
)
endpoint = builder.deploy(endpoint_name="trained-model-endpoint")Abstract base class for defining custom model loading and inference logic.
class InferenceSpec:
"""
Custom inference specification for models.
Abstract Methods (must be implemented):
load(model_dir: str) -> object
Load model from directory.
Parameters:
model_dir: str - Directory containing model artifacts
- Typically /opt/ml/model in container
- Contents of model.tar.gz extracted here
Returns:
object: Loaded model object (any type)
Raises:
Exception: If model loading fails
invoke(input_object: object, model: object) -> object
Perform inference on input data.
Parameters:
input_object: object - Deserialized input data
model: object - Model from load() method
Returns:
object: Prediction results
Raises:
Exception: If inference fails
Optional Methods:
preprocess(input_data: object) -> object
Pre-process input data before invoke().
Parameters:
input_data: object - Raw input data
Returns:
object: Preprocessed data for invoke()
postprocess(predictions: object) -> object
Post-process predictions after invoke().
Parameters:
predictions: object - Raw predictions from invoke()
Returns:
object: Processed predictions for response
prepare(*args, **kwargs) -> None
Preparation logic executed once before first inference.
Parameters:
*args, **kwargs: Preparation arguments
get_model() -> str
Get HuggingFace model name (for HF models).
Returns:
str: Model ID from HuggingFace Hub
Notes:
- load() called once at container startup
- invoke() called for each inference request
- preprocess/postprocess optional for data transformation
- Implement error handling in all methods
- Return types must match schema_builder expectations
"""Usage:
from sagemaker.serve import ModelBuilder, InferenceSpec
import torch
import json
class MyInferenceSpec(InferenceSpec):
"""Custom inference for PyTorch model."""
def load(self, model_dir):
"""Load PyTorch model."""
import torch
model_path = f"{model_dir}/model.pt"
# Load model with error handling
try:
model = torch.load(model_path, map_location='cpu')
model.eval()
return model
except FileNotFoundError:
raise RuntimeError(f"Model file not found: {model_path}")
except Exception as e:
raise RuntimeError(f"Failed to load model: {e}")
def invoke(self, input_object, model):
"""Run inference."""
with torch.no_grad():
predictions = model(input_object)
return predictions
def preprocess(self, input_data):
"""Convert input to tensor."""
import torch
import numpy as np
# Handle different input types
if isinstance(input_data, dict):
input_data = input_data.get("data", input_data)
if isinstance(input_data, list):
input_data = np.array(input_data)
return torch.tensor(input_data, dtype=torch.float32)
def postprocess(self, predictions):
"""Convert predictions to JSON-serializable format."""
return {
"predictions": predictions.numpy().tolist(),
"shape": list(predictions.shape)
}
# Use custom inference spec
builder = ModelBuilder(
inference_spec=MyInferenceSpec(),
model_path="./model",
role_arn="arn:aws:iam::123456789012:role/SageMakerRole"
)
endpoint = builder.deploy(endpoint_name="custom-inference-endpoint")Automatically detects serializers and deserializers for model inputs and outputs.
class SchemaBuilder:
"""
Automatic schema detection for serialization/deserialization.
Parameters:
sample_input: object - Sample input for schema detection (required)
- NumPy array → NumpySerializer
- Pandas DataFrame → CSVSerializer
- PyTorch Tensor → TorchTensorSerializer
- Dict/List → JSONSerializer
- Bytes → DataSerializer
- String → StringSerializer
sample_output: object - Sample output for schema detection (required)
- Same type detection as input
input_translator: Optional[CustomPayloadTranslator] - Custom input serialization
- Override auto-detection
output_translator: Optional[CustomPayloadTranslator] - Custom output serialization
- Override auto-detection
Properties:
input_serializer: BaseSerializer - Automatically detected input serializer (read-only)
input_deserializer: BaseDeserializer - Automatically detected input deserializer (read-only)
output_serializer: BaseSerializer - Automatically detected output serializer (read-only)
output_deserializer: BaseDeserializer - Automatically detected output deserializer (read-only)
custom_input_translator: Optional[CustomPayloadTranslator] - Custom input translator
custom_output_translator: Optional[CustomPayloadTranslator] - Custom output translator
sample_input: object - The input sample
sample_output: object - The output sample
Methods:
generate_marshalling_map() -> dict
Generate marshalling configuration for model server.
Returns:
dict: Marshalling configuration with serializers/deserializers
get_input_sample() -> object
Get input sample for testing.
Returns:
object: Sample input data
Supported Types:
- NumPy arrays (NumpySerializer/Deserializer) - preserves dtype, shape
- Pandas DataFrames (CSVSerializer/PandasDeserializer)
- PyTorch Tensors (TorchTensorSerializer/Deserializer) - preserves device
- TensorFlow Tensors (handled as NumPy)
- JSON-serializable objects (JSONSerializer/Deserializer)
- Bytes/Binary (DataSerializer/BytesDeserializer)
- Strings (StringSerializer/Deserializer)
Notes:
- Automatically configures serialization/deserialization pipeline
- Use with ModelBuilder for automatic format handling
- Sample shape should match production input shape
- Custom translators override auto-detection
- Detection based on Python type inspection
"""Usage:
from sagemaker.serve import ModelBuilder
from sagemaker.serve.builder.schema_builder import SchemaBuilder
import numpy as np
import pandas as pd
# NumPy arrays
sample_input = np.array([[1.0, 2.0, 3.0]])
sample_output = np.array([0.5])
schema_builder = SchemaBuilder(
sample_input=sample_input,
sample_output=sample_output
)
# Pandas DataFrame
df_input = pd.DataFrame([[1.0, 2.0, 3.0]], columns=['a', 'b', 'c'])
df_output = pd.DataFrame([[0.5]], columns=['prediction'])
schema_builder = SchemaBuilder(
sample_input=df_input,
sample_output=df_output
)
# JSON objects
json_input = {"features": [1.0, 2.0, 3.0], "metadata": {"id": 123}}
json_output = {"prediction": 0.5, "confidence": 0.92}
schema_builder = SchemaBuilder(
sample_input=json_input,
sample_output=json_output
)
# Use with ModelBuilder
builder = ModelBuilder(
model=my_model,
schema_builder=schema_builder,
role_arn=role_arn
)
# Automatic serialization/deserialization
endpoint = builder.deploy(endpoint_name="auto-schema-endpoint")
result = endpoint.invoke(data=sample_input) # Automatically handledAbstract base class for custom payload serialization/deserialization.
class CustomPayloadTranslator:
"""
Custom payload serialization/deserialization.
Parameters:
content_type: str - Content type (default: "application/custom")
- MIME type for serialized data
accept_type: str - Accept type (default: "application/custom")
- MIME type for response data
Abstract Methods (must be implemented):
serialize_payload_to_bytes(payload: object) -> bytes
Serialize payload to bytes.
Parameters:
payload: object - Python object to serialize
Returns:
bytes: Serialized data
Raises:
Exception: If serialization fails
deserialize_payload_from_stream(stream: IO) -> object
Deserialize payload from stream.
Parameters:
stream: IO - Input stream to read from
Returns:
object: Deserialized Python object
Raises:
Exception: If deserialization fails
Concrete Methods:
serialize(payload: object, content_type: str) -> bytes
Wrapper for serialization with content type.
Parameters:
payload: object - Data to serialize
content_type: str - Content type override
Returns:
bytes: Serialized data
deserialize(stream: IO, content_type: str) -> object
Wrapper for deserialization with content type.
Parameters:
stream: IO - Input stream
content_type: str - Content type of data
Returns:
object: Deserialized data
Properties:
CONTENT_TYPE: str - Content type string
ACCEPT: str - Accept type string
Notes:
- Implement for custom binary formats
- Use with SchemaBuilder for automatic application
- Handle errors gracefully in serialize/deserialize
- Consider compression for large payloads
"""Usage:
from sagemaker.serve.utils.payload_translator import CustomPayloadTranslator
from sagemaker.serve.builder.schema_builder import SchemaBuilder
import msgpack
import io
class MsgPackTranslator(CustomPayloadTranslator):
"""MessagePack serialization for efficient binary encoding."""
def __init__(self):
super().__init__(
content_type="application/msgpack",
accept_type="application/msgpack"
)
def serialize_payload_to_bytes(self, payload):
"""Serialize to MessagePack format."""
try:
return msgpack.packb(payload, use_bin_type=True)
except Exception as e:
raise RuntimeError(f"MessagePack serialization failed: {e}")
def deserialize_payload_from_stream(self, stream):
"""Deserialize from MessagePack format."""
try:
data = stream.read()
return msgpack.unpackb(data, raw=False)
except Exception as e:
raise RuntimeError(f"MessagePack deserialization failed: {e}")
# Use custom translator
translator = MsgPackTranslator()
schema_builder = SchemaBuilder(
sample_input={"features": [1.0, 2.0, 3.0]},
sample_output={"prediction": 0.5},
input_translator=translator,
output_translator=translator
)
builder = ModelBuilder(
model=my_model,
schema_builder=schema_builder,
role_arn=role_arn
)Local endpoint for testing that mimics SageMaker Endpoint interface.
class LocalEndpoint:
"""
Local endpoint for testing inference.
Parameters:
endpoint_name: str - Endpoint name (required)
endpoint_config_name: str - Endpoint configuration name (required)
local_session: Optional - Local session for managing container
model_server: Optional[ModelServer] - Model server type
serializer: Optional - Request serializer
deserializer: Optional - Response deserializer
Methods:
invoke(body, content_type=None, accept=None, **kwargs) -> InvokeEndpointOutput
Make predictions on local endpoint.
Parameters:
body: Union[bytes, object] - Request body
content_type: Optional[str] - Content type (default: from serializer)
accept: Optional[str] - Accept type (default: from deserializer)
**kwargs: Additional invoke parameters
Returns:
InvokeEndpointOutput: Response with Body, ContentType fields
Raises:
RuntimeError: If container not running or inference fails
endpoint_status() -> str
Get endpoint status.
Returns:
str: "InService", "Creating", "Failed"
refresh() -> LocalEndpoint
Refresh endpoint state.
Returns:
LocalEndpoint: Self
delete() -> None
Delete the local endpoint and stop container.
Raises:
RuntimeError: If container cannot be stopped
update(endpoint_config_name: str) -> None
Update configuration.
Parameters:
endpoint_config_name: str - New config name
Class Methods:
create(endpoint_name, endpoint_config_name, **kwargs) -> LocalEndpoint
Create local endpoint.
Parameters:
endpoint_name: str - Endpoint name
endpoint_config_name: str - Config name
**kwargs: Additional configuration
Returns:
LocalEndpoint: Created endpoint
get(endpoint_name, local_session=None) -> Optional[LocalEndpoint]
Get existing local endpoint.
Parameters:
endpoint_name: str - Endpoint name
local_session: Optional - Local session
Returns:
LocalEndpoint or None: Endpoint if exists
Notes:
- Requires Docker installed and running
- Uses same interface as SageMaker Endpoint
- Useful for local testing before deployment
- No AWS charges for local testing
- Container logs available via Docker CLI
"""Usage:
from sagemaker.serve import ModelBuilder, Mode
# Deploy locally for testing
builder = ModelBuilder(
model=my_model,
schema_builder=schema_builder,
mode=Mode.LOCAL_CONTAINER
)
# Deploy to local Docker container
local_endpoint = builder.deploy_local(endpoint_name="local-test")
# Test inference
try:
result = local_endpoint.invoke(data=test_data)
print(f"Local test result: {result}")
# Check endpoint status
status = local_endpoint.endpoint_status()
print(f"Status: {status}")
except RuntimeError as e:
print(f"Local inference failed: {e}")
# Check Docker logs: docker logs <container-id>
finally:
# Clean up
local_endpoint.delete()
# Deploy to SageMaker after successful local testing
builder.mode = Mode.SAGEMAKER_ENDPOINT
endpoint = builder.deploy(endpoint_name="production-endpoint")class ModelServer(Enum):
"""
Available model server types.
Values:
TORCHSERVE = 1
- TorchServe model server for PyTorch models
- Supports: .pt, .pth model files
- Features: Multi-model serving, model versioning, metrics
MMS = 2
- Multi-Model Server (deprecated in favor of TorchServe)
- Legacy support for existing deployments
TENSORFLOW_SERVING = 3
- TensorFlow Serving for TensorFlow models
- Supports: SavedModel format
- Features: Model versioning, batching, REST/gRPC APIs
DJL_SERVING = 4
- Deep Java Library serving for large models
- Supports: PyTorch, HuggingFace transformers
- Features: Model parallelism, dynamic batching
TRITON = 5
- NVIDIA Triton Inference Server
- Supports: PyTorch, TensorFlow, ONNX, TensorRT
- Features: Multi-framework, GPU optimization, dynamic batching
TGI = 6
- Text Generation Inference (HuggingFace)
- Optimized for LLMs
- Features: Streaming, batching, quantization
TEI = 7
- Text Embeddings Inference (HuggingFace)
- Optimized for embedding models
- Features: Batching, multi-model
SMD = 8
- SageMaker Model Deployment (internal)
Notes:
- Auto-detected if not specified
- TorchServe: PyTorch models
- TensorFlow Serving: TensorFlow models
- Triton: Multi-framework, best GPU utilization
- TGI/TEI: Optimized for specific HuggingFace use cases
- DJL: Large models with model parallelism
"""class Mode(Enum):
"""
Deployment mode.
Values:
IN_PROCESS = 1
- Run locally in current Python process
- No containerization
- Fastest for development
- No isolation
- Memory constraints of current process
LOCAL_CONTAINER = 2
- Run locally in Docker container
- Same environment as SageMaker
- Test before deployment
- Requires Docker
- No AWS charges
SAGEMAKER_ENDPOINT = 3
- Deploy to SageMaker endpoint (default)
- Production deployment
- Scalable and managed
- AWS charges apply
- Monitoring and logging
Notes:
- IN_PROCESS: Quick iteration, no Docker required
- LOCAL_CONTAINER: Test deployment locally
- SAGEMAKER_ENDPOINT: Production use
- Progression: IN_PROCESS → LOCAL_CONTAINER → SAGEMAKER_ENDPOINT
"""class HardwareType(Enum):
"""
Hardware types for inference.
Values:
CPU = 1
- CPU-only instances (ml.m5, ml.c5, ml.t2)
- Lower cost
- Good for simple models
GPU = 2
- GPU instances (ml.g4dn, ml.p3, ml.p4d)
- High throughput for deep learning
- Best for image, video, NLP models
INFERENTIA_1 = 3
- AWS Inferentia 1 (ml.inf1)
- Optimized for inference
- Lower cost than GPU
- Requires model compilation
INFERENTIA_2 = 4
- AWS Inferentia 2 (ml.inf2)
- Latest inference chip
- Better performance than Inf1
- Supports larger models
GRAVITON = 5
- AWS Graviton (ARM processors)
- ml.c7g, ml.m7g instances
- Lower cost than x86
- Good for CPU-bound workloads
Notes:
- Choose based on:
- Model type and complexity
- Latency requirements
- Throughput needs
- Cost constraints
- GPU: Best for large models, real-time inference
- Inferentia: Cost-effective for batch or async inference
- CPU: Simple models, low traffic
"""class Network:
"""
Network configuration for deployment.
Fields:
subnets: Optional[List[str]] - VPC subnet IDs
- Format: ["subnet-xxx", "subnet-yyy"]
- Use multiple AZs for high availability
security_group_ids: Optional[List[str]] - Security group IDs
- Format: ["sg-xxx"]
- Must allow inbound HTTP/HTTPS for inference
enable_network_isolation: bool - Enable network isolation (default: False)
- Blocks all network except S3/ECR
- No internet access
- For compliance and security
vpc_config: Optional[Dict] - VPC configuration dictionary
- Alternative to subnets/security_group_ids
Notes:
- VPC deployment for private endpoints
- Security groups must allow:
- Inbound: Port 443 from clients
- Outbound: S3/ECR access
- Network isolation: model can't access external APIs
- VPC endpoints required for S3/ECR with isolation
"""class Compute:
"""
Compute configuration for deployment.
Fields:
instance_type: Optional[str] - EC2 instance type
- Inference instances: ml.t2.medium - ml.p4d.24xlarge
- Example: "ml.m5.xlarge", "ml.g4dn.xlarge"
instance_count: Optional[int] - Number of instances (default: 1)
- Range: 1-100 for real-time endpoints
- Use >1 for high availability and throughput
Notes:
- Start with smallest instance that meets latency needs
- Scale up for throughput, scale out for availability
- Use auto-scaling for variable traffic
- Monitor instance metrics to right-size
"""class Phase:
"""
Traffic pattern phase for Advanced Inference Recommendations.
Parameters:
duration_in_seconds: int - Phase duration (required)
- Length of this traffic phase
- Range: 120-3600 seconds
initial_number_of_users: int - Initial concurrent users (required)
- Starting concurrency
- Range: 1-10000
spawn_rate: int - User spawn rate (required)
- Users added per second
- Range: 1-10000
Properties:
to_json: dict - JSON representation for API
Notes:
- Define realistic traffic patterns for recommendations
- Multiple phases simulate varying load
- Example: ramp-up, steady-state, peak, cool-down
"""Usage:
from sagemaker.serve.builder.model_builder import Phase
# Define traffic pattern
phases = [
Phase(
duration_in_seconds=300, # 5 min ramp-up
initial_number_of_users=1,
spawn_rate=5 # Add 5 users/sec
),
Phase(
duration_in_seconds=600, # 10 min steady state
initial_number_of_users=1500,
spawn_rate=0
),
Phase(
duration_in_seconds=300, # 5 min peak
initial_number_of_users=1500,
spawn_rate=10 # Ramp to 3000
)
]
# Get recommendations
recommendations = builder.right_size(
sample_payload_url="s3://bucket/payloads.jsonl",
supported_instance_types=["ml.m5.xlarge", "ml.m5.2xlarge", "ml.g4dn.xlarge"],
traffic_pattern=phases
)
print("Recommendations:")
for rec in recommendations['InferenceRecommendations']:
print(f" {rec['InstanceType']}: {rec['CostPerHour']}/hour, "
f"{rec['ExpectedLatency']}ms latency")class ModelLatencyThreshold:
"""
Latency threshold for inference recommendations.
Parameters:
percentile: str - Percentile (required)
- Format: "P50", "P90", "P95", "P99", "P99.9"
- Latency threshold at this percentile
value_in_milliseconds: int - Threshold value in ms (required)
- Maximum acceptable latency
- Range: 1-1000000
Properties:
to_json: dict - JSON representation
Notes:
- Recommendations filtered by latency requirement
- P50: median latency
- P99: 99th percentile (only 1% requests slower)
- Higher percentile = stricter requirement
"""Builder class for deploying SageMaker models to Amazon Bedrock.
class BedrockModelBuilder:
"""
Deploy SageMaker models to Amazon Bedrock.
Automatically detects model type (Nova vs. other models) and uses
the appropriate Bedrock API for deployment.
Parameters:
model: Optional[Union[ModelTrainer, TrainingJob, ModelPackage]] - Model to deploy
- ModelTrainer instance
- Completed TrainingJob
- ModelPackage from Model Registry
Methods:
deploy(job_name=None, imported_model_name=None, custom_model_name=None,
role_arn=None, job_tags=None, imported_model_tags=None,
model_tags=None, client_request_token=None,
imported_model_kms_key_id=None) -> Dict[str, Any]
Deploy to Bedrock.
Parameters:
job_name: Optional[str] - Import job name (for non-Nova models)
imported_model_name: Optional[str] - Imported model name (for non-Nova)
custom_model_name: Optional[str] - Custom model name (for Nova)
role_arn: Optional[str] - Bedrock service role ARN (required)
job_tags: Optional[List[Dict]] - Tags for import job
imported_model_tags: Optional[List[Dict]] - Tags for imported model
model_tags: Optional[List[Dict]] - Tags for custom model
client_request_token: Optional[str] - Idempotency token
imported_model_kms_key_id: Optional[str] - KMS key for encryption
Returns:
Dict[str, Any]: Response with jobArn (non-Nova) or modelArn (Nova)
Raises:
ValueError: Invalid model type or configuration
ClientError: Bedrock API errors
Detection Logic:
- Nova models: Uses create_custom_model API
- Other models: Uses create_model_import_job API
- Automatic detection based on model artifacts
Notes:
- Bedrock deployment for production LLM serving
- Different API for Nova vs non-Nova models
- Role needs Bedrock permissions
- Supported regions: check Bedrock documentation
"""Usage:
from sagemaker.serve.builder.bedrock_builder import BedrockModelBuilder
from sagemaker.core.resources import TrainingJob
# Get completed training job
training_job = TrainingJob.get("my-llama-fine-tune-job")
# Create Bedrock builder
bedrock_builder = BedrockModelBuilder(model=training_job)
# Deploy to Bedrock
response = bedrock_builder.deploy(
job_name="llama-bedrock-import",
imported_model_name="my-fine-tuned-llama",
role_arn="arn:aws:iam::123456789012:role/BedrockRole",
job_tags=[
{"Key": "Environment", "Value": "Production"},
{"Key": "Model", "Value": "Llama2-7B"}
],
imported_model_kms_key_id="arn:aws:kms:us-west-2:123:key/abc"
)
print(f"Job ARN: {response['jobArn']}")
print("Monitor import job in Bedrock console")Make asynchronous inference requests to SageMaker endpoints.
class AsyncPredictor:
"""
Async prediction wrapper for SageMaker endpoints.
Parameters:
predictor: sagemaker.predictor.Predictor - Base predictor to wrap (required)
name: Optional[str] - Name for the async predictor
Methods:
predict(data, input_path=None, initial_args=None, inference_id=None, waiter_config=None) -> object
Wait for async inference result.
Parameters:
data: object - Input data to predict
input_path: Optional[str] - S3 path if data pre-uploaded
initial_args: Optional[Dict] - Additional arguments
inference_id: Optional[str] - Unique inference ID for tracking
waiter_config: Optional[WaiterConfig] - Custom waiter configuration
Returns:
object: Deserialized prediction result
Raises:
AsyncInferenceModelError: If inference failed
ObjectNotExistedError: If result not available within timeout
predict_async(data, input_path=None, initial_args=None, inference_id=None) -> AsyncInferenceResponse
Start async inference without waiting.
Parameters:
data: object - Input data
input_path: Optional[str] - S3 path if data pre-uploaded
initial_args: Optional[Dict] - Additional arguments
inference_id: Optional[str] - Unique inference ID
Returns:
AsyncInferenceResponse: Response object for retrieving result later
update_endpoint(initial_instance_count=None, instance_type=None,
accelerator_type=None, model_name=None, tags=None,
kms_key=None, data_capture_config_dict=None, wait=True) -> None
Update endpoint configuration.
delete_endpoint(delete_endpoint_config=True) -> None
Delete endpoint.
Parameters:
delete_endpoint_config: bool - Also delete endpoint config (default: True)
delete_model() -> None
Delete model resource.
enable_data_capture() -> None
Enable data capture on endpoint.
disable_data_capture() -> None
Disable data capture on endpoint.
update_data_capture_config(data_capture_config) -> None
Update data capture configuration.
list_monitors() -> list
List model monitors for this endpoint.
Returns:
list: Monitoring schedules
endpoint_context() -> Context
Get lineage context for endpoint.
Returns:
Context: Lineage context
Attributes:
predictor: Predictor - Underlying Predictor object
endpoint_name: str - SageMaker endpoint name
sagemaker_session: Session - SageMaker session
serializer: BaseSerializer - Request serializer
deserializer: BaseDeserializer - Response deserializer
Notes:
- Use for long-running inferences (>60 seconds)
- Results stored in S3, retrieved asynchronously
- Endpoint must be configured for async inference
- Suitable for large payloads (up to 1 GB)
- No request timeout limits
"""Usage:
from sagemaker.predictor import Predictor
from sagemaker.serve.utils.predictors import AsyncPredictor
from sagemaker.serve.utils.async_inference import WaiterConfig
# Create predictor for async endpoint
predictor = Predictor(endpoint_name="my-async-endpoint")
# Wrap with AsyncPredictor
async_predictor = AsyncPredictor(predictor, name="my-async-predictor")
# Make async prediction with blocking
waiter_config = WaiterConfig(max_attempts=100, delay=10)
try:
result = async_predictor.predict(
data={"large": "input data"},
waiter_config=waiter_config,
inference_id="request-123"
)
print(f"Result: {result}")
except AsyncInferenceModelError as e:
print(f"Inference failed: {e}")
except ObjectNotExistedError:
print("Result not ready within timeout period")
# Or start async without waiting
response = async_predictor.predict_async(
data=large_input,
inference_id="request-456"
)
print(f"Output will be at: {response.output_path}")
# Check result later
result = response.get_result(waiter_config=WaiterConfig(max_attempts=50))Response object from async inference endpoints.
class AsyncInferenceResponse:
"""
Response from async inference request.
Parameters:
predictor_async: AsyncPredictor - The async predictor (required)
output_path: str - S3 location for successful results (required)
failure_path: str - S3 location for failed requests (required)
Methods:
get_result(waiter_config=None) -> object
Get inference result from S3.
Parameters:
waiter_config: Optional[WaiterConfig] - Custom waiter config
Returns:
object: Deserialized inference result
Raises:
ObjectNotExistedError: If inference still running
AsyncInferenceModelError: If inference failed
UnexpectedClientError: For other S3 errors
Attributes:
predictor_async: AsyncPredictor - AsyncPredictor instance
output_path: str - S3 path for inference output
failure_path: str - S3 path for failure information
_result: Optional[object] - Cached result (None until retrieved)
Notes:
- Results stored in S3 for retrieval
- Check failure_path if ObjectNotExistedError
- Result cached after first retrieval
- S3 paths cleaned up per endpoint configuration
"""Configuration for async inference waiters.
class WaiterConfig:
"""
Waiter configuration for async inference.
Parameters:
max_attempts: int - Maximum number of attempts (default: 60)
- Number of times to check for result
- Range: 1-1000
delay: int - Delay between attempts in seconds (default: 15)
- Time to wait between checks
- Range: 1-300
Methods:
_to_request_dict() -> dict
Generate waiter configuration dict.
Returns:
dict: Configuration for boto3 waiter
Total Wait Time:
max_attempts * delay = total wait time in seconds
Default: 60 * 15 = 900 seconds (15 minutes)
Examples:
- Quick polling: WaiterConfig(max_attempts=100, delay=5) = 8.3 minutes
- Long-running: WaiterConfig(max_attempts=240, delay=30) = 2 hours
- Default: WaiterConfig(max_attempts=60, delay=15) = 15 minutes
Notes:
- Balance between responsiveness and API calls
- Longer delay reduces S3 API calls
- More attempts allows longer-running inference
- Consider inference time when setting values
"""Usage:
from sagemaker.serve.utils.predictors import AsyncPredictor
from sagemaker.serve.utils.async_inference import WaiterConfig
# Wait up to 30 minutes (120 attempts * 15 seconds)
long_waiter = WaiterConfig(max_attempts=120, delay=15)
result = async_predictor.predict(
data=my_large_input,
waiter_config=long_waiter
)
# Quick polling for fast inferences (5 second intervals)
quick_waiter = WaiterConfig(max_attempts=100, delay=5)
# Very long-running inference (2 hours)
very_long_waiter = WaiterConfig(max_attempts=240, delay=30)
# No waiting (check later manually)
response = async_predictor.predict_async(data=input_data)
# ... do other work ...
result = response.get_result(waiter_config=quick_waiter)Serverless Inference:
from sagemaker.serve import ModelBuilder
from sagemaker.core.resources import ServerlessInferenceConfig
# Deploy with serverless inference
builder = ModelBuilder(
model="my-model",
model_path="s3://my-bucket/model.tar.gz",
role_arn=role_arn
)
serverless_config = ServerlessInferenceConfig(
memory_size_in_mb=2048, # 1024, 2048, 3072, 4096, 5120, 6144
max_concurrency=10 # 1-200
)
try:
endpoint = builder.deploy(
endpoint_name="serverless-endpoint",
inference_config=serverless_config
)
# No instance management needed
# Pay per inference request
# Auto-scales to zero
except ValueError as e:
if "memory_size" in str(e):
print("Invalid memory size - must be 1024-6144 in 1024 MB increments")Async Inference:
from sagemaker.core.resources import AsyncInferenceConfig
# Deploy with async inference
async_config = AsyncInferenceConfig(
output_path="s3://my-bucket/async-output",
max_concurrent_invocations_per_instance=10, # 1-1000
failure_path="s3://my-bucket/async-failures", # Optional
notification_config={ # Optional SNS notifications
"SuccessTopic": "arn:aws:sns:us-west-2:123:success-topic",
"ErrorTopic": "arn:aws:sns:us-west-2:123:error-topic"
}
)
endpoint = builder.deploy(
endpoint_name="async-endpoint",
initial_instance_count=2,
instance_type="ml.m5.2xlarge",
inference_config=async_config
)
# Use AsyncPredictor for requests
async_predictor = AsyncPredictor(Predictor(endpoint_name="async-endpoint"))
result = async_predictor.predict(data=large_input)Batch Transform:
# Create transformer for batch inference
transformer = builder.transformer(
instance_count=2,
instance_type="ml.m5.xlarge",
strategy="MultiRecord", # Batch multiple records per request
output_path="s3://my-bucket/batch-output",
max_concurrent_transforms=8, # Parallel transforms per instance
max_payload=6 # Max payload size in MB
)
# Transform batch data
transformer.transform(
data="s3://my-bucket/batch-input",
content_type="application/json",
split_type="Line", # One record per line
join_source="Input", # Join output with input
input_filter="$.features", # Extract features field
output_filter="$.predictions" # Extract predictions field
)
# Wait for completion
transformer.wait()
# Results in: s3://my-bucket/batch-output/Model Optimization:
from sagemaker.core.resources import QuantizationConfig, CompilationConfig
# Optimize model with quantization
optimized_model = builder.optimize(
model_name="optimized-model",
output_path="s3://my-bucket/optimized",
instance_type="ml.p4d.24xlarge",
quantization_config=QuantizationConfig(
target_precision="int8", # int8, fp16, bf16
optimization_level=2, # 1-3, higher = more aggressive
override_environment={
"MAX_BATCH_SIZE": "32",
"MAX_SEQUENCE_LENGTH": "512"
}
),
accept_eula=True
)
# Or compile for specific hardware
compiled_model = builder.optimize(
model_name="compiled-model",
output_path="s3://my-bucket/compiled",
instance_type="ml.inf2.xlarge", # Inferentia 2
compilation_config=CompilationConfig(
target_device="ml_inf2",
compiler_options={
"dtype": "fp16",
"num_neuroncores": 2
}
)
)
# Deploy optimized model
optimized_builder = ModelBuilder(
model=optimized_model,
role_arn=role_arn
)
endpoint = optimized_builder.deploy(endpoint_name="optimized-endpoint")Local Testing:
from sagemaker.serve import ModelBuilder, Mode
# Test locally before deploying
builder = ModelBuilder(
model=my_model,
schema_builder=schema_builder,
mode=Mode.LOCAL_CONTAINER
)
# Deploy locally
local_endpoint = builder.deploy_local(endpoint_name="local-test")
# Test inference with various inputs
test_cases = [test_input_1, test_input_2, test_input_3]
for test_input in test_cases:
try:
result = local_endpoint.invoke(data=test_input)
print(f"Test passed: {result}")
except Exception as e:
print(f"Test failed: {e}")
# Clean up local resources
local_endpoint.delete()
# Deploy to SageMaker after validation
builder.mode = Mode.SAGEMAKER_ENDPOINT
endpoint = builder.deploy(endpoint_name="production-endpoint")Multi-Model Endpoint:
from sagemaker.core.resources import Model
# Deploy multiple models to single endpoint
models = [model1, model2, model3]
builder = ModelBuilder(
model=models, # List of models
role_arn=role_arn,
instance_type="ml.m5.2xlarge"
)
endpoint = builder.deploy(
endpoint_name="multi-model-endpoint",
initial_instance_count=2
)
# Invoke specific model
result = endpoint.invoke(
data=input_data,
target_model="model1" # Specify which model
)Inference Recommendations:
# Get recommendations for optimal instance type
recommendations = builder.right_size(
sample_payload_url="s3://bucket/sample-payloads.jsonl",
supported_content_types=["application/json"],
supported_instance_types=[
"ml.m5.xlarge", "ml.m5.2xlarge",
"ml.c5.xlarge", "ml.c5.2xlarge",
"ml.g4dn.xlarge"
],
job_duration_in_seconds=600 # 10 minute benchmark
)
# Analyze recommendations
for rec in recommendations['InferenceRecommendations']:
instance = rec['InstanceType']
cost = rec['CostPerHour']
latency_p50 = rec['Metrics']['ModelLatency']
latency_p99 = rec['Metrics']['ModelLatencyP99']
throughput = rec['Metrics']['MaximumInvocations']
print(f"{instance}:")
print(f" Cost: ${cost}/hour")
print(f" Latency P50: {latency_p50}ms")
print(f" Latency P99: {latency_p99}ms")
print(f" Throughput: {throughput} invocations/sec")
# Choose recommended instance
best_rec = recommendations['InferenceRecommendations'][0]
endpoint = builder.deploy(
endpoint_name="optimized-endpoint",
instance_type=best_rec['InstanceType']
)Model Registry Integration:
# Deploy and register to Model Registry
model_package = builder.register(
model_package_group_name="production-models",
content_types=["application/json"],
response_types=["application/json"],
inference_instances=["ml.m5.xlarge", "ml.m5.2xlarge"],
transform_instances=["ml.m5.xlarge"],
model_approval_status="PendingManualApproval",
approval_description="Awaiting QA approval",
customer_metadata_properties={
"model_type": "classification",
"training_date": "2024-01-15",
"accuracy": "0.95"
}
)
print(f"Registered: {model_package.model_package_arn}")
# Later: deploy from Model Registry
from sagemaker.core.resources import ModelPackage
model_pkg = ModelPackage.get(model_package_arn)
# Check approval status
if model_pkg.model_approval_status == "Approved":
# Deploy approved model
builder = ModelBuilder(
model=model_pkg,
role_arn=role_arn
)
endpoint = builder.deploy(endpoint_name="approved-model-endpoint")from sagemaker.serve import ModelBuilder, ModelServer
builder = ModelBuilder(
model=pytorch_model,
model_server=ModelServer.TORCHSERVE,
role_arn=role_arn
)
# Configure TorchServe-specific settings
builder.configure_for_torchserve(
shared_libs=["/usr/local/lib/custom.so"],
dependencies=["torch==2.0.0", "transformers==4.35.0"],
image_config={
"TS_MAX_REQUEST_SIZE": "100000000", # 100 MB
"TS_MAX_RESPONSE_SIZE": "100000000",
"TS_DEFAULT_WORKERS_PER_MODEL": "2"
}
)
endpoint = builder.deploy(endpoint_name="torchserve-endpoint")from sagemaker.serve import ModelBuilder, ModelServer
from sagemaker.serve.builder.schema_builder import SchemaBuilder
import torch
# Triton supports multiple frameworks
sample_input = torch.randn(1, 3, 224, 224)
sample_output = torch.randn(1, 1000)
schema = SchemaBuilder(sample_input, sample_output)
builder = ModelBuilder(
model=my_model,
schema_builder=schema,
model_server=ModelServer.TRITON,
role_arn=role_arn,
env_vars={
"TRITON_MAX_BATCH_SIZE": "8",
"TRITON_BUFFER_MANAGER_THREAD_COUNT": "2"
}
)
endpoint = builder.deploy(
endpoint_name="triton-endpoint",
instance_type="ml.g4dn.xlarge"
)from sagemaker.serve import ModelBuilder, ModelServer
# Deploy LLM with TGI
builder = ModelBuilder(
model="meta-llama/Llama-2-7b-hf", # HuggingFace model
model_server=ModelServer.TGI,
role_arn=role_arn,
env_vars={
"MAX_INPUT_LENGTH": "1024",
"MAX_TOTAL_TOKENS": "2048",
"MAX_BATCH_TOTAL_TOKENS": "8192",
"QUANTIZE": "bitsandbytes-nf4" # Optional quantization
}
)
endpoint = builder.deploy(
endpoint_name="llm-endpoint",
instance_type="ml.g5.2xlarge"
)
# Streaming inference
response = endpoint.invoke_stream(
data={"inputs": "Once upon a time", "parameters": {"max_new_tokens": 100}}
)
for chunk in response:
print(chunk, end='', flush=True)from sagemaker.serve import ModelBuilder
import boto3
# Deploy endpoint
endpoint = builder.deploy(
endpoint_name="auto-scaled-endpoint",
initial_instance_count=2,
instance_type="ml.m5.xlarge"
)
# Configure auto-scaling
client = boto3.client('application-autoscaling')
# Register scalable target
client.register_scalable_target(
ServiceNamespace='sagemaker',
ResourceId=f'endpoint/{endpoint.endpoint_name}/variant/variant-1',
ScalableDimension='sagemaker:variant:DesiredInstanceCount',
MinCapacity=2,
MaxCapacity=10
)
# Create scaling policy
client.put_scaling_policy(
PolicyName='scale-on-invocations',
ServiceNamespace='sagemaker',
ResourceId=f'endpoint/{endpoint.endpoint_name}/variant/variant-1',
ScalableDimension='sagemaker:variant:DesiredInstanceCount',
PolicyType='TargetTrackingScaling',
TargetTrackingScalingPolicyConfiguration={
'TargetValue': 70.0,
'PredefinedMetricSpecification': {
'PredefinedMetricType': 'SageMakerVariantInvocationsPerInstance'
},
'ScaleInCooldown': 300,
'ScaleOutCooldown': 60
}
)# Deploy new version alongside old version
# Step 1: Deploy new model
new_endpoint = builder.deploy(
endpoint_name="model-v2-endpoint",
initial_instance_count=1,
instance_type="ml.m5.xlarge"
)
# Step 2: Route small percentage of traffic to new model
# (Use endpoint config with multiple variants)
from sagemaker.core.resources import EndpointConfig
config = EndpointConfig.create(
endpoint_config_name="blue-green-config",
production_variants=[
{
"VariantName": "blue",
"ModelName": old_model_name,
"InitialInstanceCount": 2,
"InstanceType": "ml.m5.xlarge",
"InitialVariantWeight": 0.9 # 90% traffic
},
{
"VariantName": "green",
"ModelName": new_model_name,
"InitialInstanceCount": 1,
"InstanceType": "ml.m5.xlarge",
"InitialVariantWeight": 0.1 # 10% traffic
}
]
)
# Update endpoint with blue/green config
endpoint.update(endpoint_config_name="blue-green-config")
endpoint.wait_for_in_service()
# Monitor green variant performance
# If successful, shift more traffic to green
# If issues, roll back by adjusting weights# Deploy two models for A/B testing
endpoint_config = EndpointConfig.create(
endpoint_config_name="ab-test-config",
production_variants=[
{
"VariantName": "model-a",
"ModelName": model_a_name,
"InitialInstanceCount": 1,
"InstanceType": "ml.m5.xlarge",
"InitialVariantWeight": 0.5 # 50% traffic
},
{
"VariantName": "model-b",
"ModelName": model_b_name,
"InitialInstanceCount": 1,
"InstanceType": "ml.m5.xlarge",
"InitialVariantWeight": 0.5 # 50% traffic
}
],
data_capture_config=DataCaptureConfig(
enable_capture=True,
sampling_percentage=100,
destination_s3_uri="s3://bucket/ab-test-capture"
)
)
endpoint = Endpoint.create(
endpoint_name="ab-test-endpoint",
endpoint_config_name="ab-test-config"
)
# Invoke with target variant
result_a = endpoint.invoke(
body=input_data,
target_variant="model-a"
)
result_b = endpoint.invoke(
body=input_data,
target_variant="model-b"
)# Deploy new model as shadow variant (no traffic)
endpoint_config = EndpointConfig.create(
endpoint_config_name="shadow-config",
production_variants=[
{
"VariantName": "production",
"ModelName": prod_model,
"InitialInstanceCount": 2,
"InstanceType": "ml.m5.xlarge",
"InitialVariantWeight": 1.0 # All traffic
}
],
shadow_production_variants=[
{
"VariantName": "shadow",
"ModelName": new_model,
"InitialInstanceCount": 1,
"InstanceType": "ml.m5.xlarge",
"ShadowModeConfig": {
"SourceModelVariantName": "production",
"ShadowModelVariants": [
{
"SamplingPercentage": 100 # Shadow all requests
}
]
}
}
]
)
# Shadow variant receives copies of production traffic
# Compare outputs without affecting production
# Access shadow predictions via data captureModel Load Error:
Serialization Error:
Container Timeout:
Memory Error (OOM):
Endpoint Throttling:
Async Result Not Found: