Continuous model monitoring for data quality, model quality, bias drift, and explainability with automated alerting.
Base class for model monitoring with scheduling and execution management.
class ModelMonitor:
"""
Base model monitoring class.
Parameters:
role: str - IAM role ARN (required)
- Needs: sagemaker:CreateMonitoringSchedule, s3:GetObject, s3:PutObject
instance_count: int - Number of instances (default: 1)
instance_type: str - EC2 instance type (required)
volume_size_in_gb: int - EBS volume size (default: 30)
volume_kms_key: Optional[str] - KMS key for volume encryption
output_kms_key: Optional[str] - KMS key for output encryption
max_runtime_in_seconds: Optional[int] - Maximum runtime (default: 3600)
base_job_name: Optional[str] - Base job name
sagemaker_session: Optional[Session] - SageMaker session
env: Optional[Dict[str, str]] - Environment variables
tags: Optional[List[Tag]] - Resource tags
network_config: Optional[NetworkConfig] - Network configuration
Methods:
create_monitoring_schedule(monitor_schedule_name, endpoint_input, output,
constraints=None, statistics=None,
schedule_cron_expression=None, enable_cloudwatch_metrics=True) -> Dict
Create monitoring schedule.
Parameters:
monitor_schedule_name: str - Schedule name (required)
endpoint_input: EndpointInput - Endpoint to monitor (required)
output: MonitoringOutput - Output configuration (required)
constraints: Optional[str] - S3 URI for constraints baseline
statistics: Optional[str] - S3 URI for statistics baseline
schedule_cron_expression: Optional[str] - Cron schedule
enable_cloudwatch_metrics: bool - Publish to CloudWatch (default: True)
Returns:
Dict: Response with MonitoringScheduleArn
Raises:
ValueError: Invalid configuration or endpoint not found
ClientError: AWS API errors
update_monitoring_schedule(monitor_schedule_name, endpoint_input=None, output=None,
constraints=None, statistics=None,
schedule_cron_expression=None, enable_cloudwatch_metrics=None) -> Dict
Update existing monitoring schedule.
Parameters:
monitor_schedule_name: str - Schedule name (required)
... (other parameters override existing values)
Returns:
Dict: Response with MonitoringScheduleArn
delete_monitoring_schedule(monitor_schedule_name=None) -> None
Delete monitoring schedule.
Parameters:
monitor_schedule_name: Optional[str] - Schedule name
Raises:
ClientError: If schedule not found or deletion fails
describe_schedule(monitor_schedule_name=None) -> Dict
Get schedule details.
Returns:
Dict: Complete schedule description
list_executions(schedule_name=None, status_equals=None, sort_by="CreationTime",
sort_order="Descending", max_results=100) -> List[Dict]
List monitoring executions.
Parameters:
schedule_name: Optional[str] - Filter by schedule
status_equals: Optional[str] - Filter by status
sort_by: str - Sort field
sort_order: str - Sort order
max_results: int - Maximum results (1-100)
Returns:
List[Dict]: Execution summaries
suggest_baseline(baseline_dataset, dataset_format, output_s3_uri, wait=True, logs=True) -> None
Create baseline for monitoring.
Parameters:
baseline_dataset: str - S3 URI for baseline data (required)
dataset_format: Dict - Dataset format specification (required)
output_s3_uri: str - S3 URI for baseline outputs (required)
wait: bool - Block until completion (default: True)
logs: bool - Show logs (default: True)
Raises:
ValueError: Invalid dataset format
ClientError: AWS API errors
Attributes:
latest_monitoring_job: ProcessingJob - Most recent monitoring execution
Notes:
- Baseline created from training/validation data
- Monitoring compares production data against baseline
- CloudWatch metrics enable alarms
- Violations written to S3 and optionally to SNS
"""Default monitoring for data drift detection and quality checks.
class DefaultModelMonitor(ModelMonitor):
"""
Default model monitor for data quality and drift.
Monitors:
- Data drift from baseline distribution
- Statistical properties changes (mean, std, min, max)
- Schema violations (new/missing features)
- Missing values increase
- Distribution shifts (KL divergence, Wasserstein distance)
Methods:
create_monitoring_schedule(monitor_schedule_name, endpoint_input, output,
statistics=None, constraints=None,
schedule_cron_expression=None, enable_cloudwatch_metrics=True,
data_quality_monitoring_config=None) -> Dict
Create data quality monitoring schedule.
Parameters:
monitor_schedule_name: str - Schedule name (required)
endpoint_input: EndpointInput - Endpoint to monitor (required)
output: MonitoringOutput - Output configuration (required)
statistics: Optional[str] - Baseline statistics S3 URI
constraints: Optional[str] - Baseline constraints S3 URI
schedule_cron_expression: Optional[str] - Cron schedule
enable_cloudwatch_metrics: bool - Publish metrics (default: True)
data_quality_monitoring_config: Optional[Dict] - Custom config
Returns:
Dict: Response with schedule ARN
suggest_baseline(baseline_dataset, dataset_format, output_s3_uri, wait=True, logs=True) -> None
Generate baseline statistics and constraints.
Parameters:
baseline_dataset: str - S3 URI (required)
dataset_format: Dict - Format specification (required)
- Example: {"csv": {"header": True}}
output_s3_uri: str - S3 URI for outputs (required)
wait: bool - Block until completion (default: True)
logs: bool - Show logs (default: True)
Creates:
- statistics.json: Statistical properties
- constraints.json: Data quality constraints
Configuration:
Uses DataQualityMonitoringConfig for custom settings
Notes:
- First step: create baseline from training data
- Second step: create schedule to monitor endpoint
- Violations trigger CloudWatch alarms if configured
- Check violations report in output S3 location
"""Usage:
from sagemaker.core.model_monitor import DefaultModelMonitor, DataCaptureConfig, EndpointInput, MonitoringOutput, CronExpressionGenerator
# Step 1: Enable data capture on endpoint
data_capture_config = DataCaptureConfig(
enable_capture=True,
sampling_percentage=100, # Capture 100% of requests
destination_s3_uri="s3://my-bucket/data-capture",
capture_options=["REQUEST", "RESPONSE"] # Capture both
)
# Deploy endpoint with data capture
endpoint = builder.deploy(
endpoint_name="monitored-endpoint",
data_capture_config=data_capture_config
)
# Step 2: Create monitor
monitor = DefaultModelMonitor(
role="arn:aws:iam::123456789012:role/SageMakerRole",
instance_count=1,
instance_type="ml.m5.xlarge",
volume_size_in_gb=30
)
# Step 3: Generate baseline from training data
try:
monitor.suggest_baseline(
baseline_dataset="s3://my-bucket/training-data/baseline.csv",
dataset_format={"csv": {"header": True, "separator": ","}},
output_s3_uri="s3://my-bucket/baseline",
wait=True,
logs=True
)
print("Baseline created:")
print(f" Statistics: s3://my-bucket/baseline/statistics.json")
print(f" Constraints: s3://my-bucket/baseline/constraints.json")
except RuntimeError as e:
print(f"Baseline creation failed: {e}")
# Step 4: Create monitoring schedule
monitor.create_monitoring_schedule(
monitor_schedule_name="data-quality-monitor",
endpoint_input=EndpointInput(
endpoint_name="monitored-endpoint",
destination="/opt/ml/processing/input/endpoint"
),
output=MonitoringOutput(
source="/opt/ml/processing/output",
destination="s3://my-bucket/monitoring-results"
),
statistics="s3://my-bucket/baseline/statistics.json",
constraints="s3://my-bucket/baseline/constraints.json",
schedule_cron_expression=CronExpressionGenerator.hourly(),
enable_cloudwatch_metrics=True
)
print("Monitoring schedule created - checking hourly for data drift")Analyzing Violations:
# List monitoring executions
executions = monitor.list_executions(
schedule_name="data-quality-monitor",
sort_by="ScheduledTime",
sort_order="Descending",
max_results=10
)
# Check for violations
for execution in executions:
if execution.get('MonitoringExecutionStatus') == 'CompletedWithViolations':
print(f"Violations detected at {execution['ScheduledTime']}")
# Download violations report
from sagemaker.core.s3 import S3Downloader
violations = S3Downloader.read_file(
s3_uri=execution['ProcessingJobArn'] + "/constraint_violations.json"
)
print(f"Violations: {violations}")Monitors model prediction quality against ground truth labels.
class ModelQualityMonitor(ModelMonitor):
"""
Model quality monitoring for prediction accuracy.
Monitors:
- Accuracy metrics (accuracy, precision, recall, F1)
- Regression metrics (MAE, MSE, RMSE, R²)
- Prediction drift from baseline
- Label distribution drift
- Ground truth comparison
Methods:
create_monitoring_schedule(monitor_schedule_name, endpoint_input, ground_truth_input,
problem_type, output, constraints=None, statistics=None,
schedule_cron_expression=None, enable_cloudwatch_metrics=True) -> Dict
Create model quality monitoring schedule.
Parameters:
monitor_schedule_name: str - Schedule name (required)
endpoint_input: EndpointInput - Endpoint to monitor (required)
ground_truth_input: str - S3 URI for ground truth labels (required)
problem_type: str - Problem type (required)
- "BinaryClassification"
- "MulticlassClassification"
- "Regression"
output: MonitoringOutput - Output configuration (required)
constraints: Optional[str] - Baseline constraints S3 URI
statistics: Optional[str] - Baseline statistics S3 URI
schedule_cron_expression: Optional[str] - Cron schedule
enable_cloudwatch_metrics: bool - Publish metrics (default: True)
Returns:
Dict: Response with schedule ARN
Raises:
ValueError: Invalid problem_type or missing ground truth
suggest_baseline(baseline_dataset, dataset_format, problem_type,
inference_attribute, probability_attribute=None,
ground_truth_attribute, output_s3_uri, wait=True, logs=True) -> None
Generate quality baseline.
Parameters:
baseline_dataset: str - S3 URI (required)
dataset_format: Dict - Format specification (required)
problem_type: str - Problem type (required)
inference_attribute: str - Column name for predictions (required)
probability_attribute: Optional[str] - Column for probabilities
- Required for classification with probability_threshold
ground_truth_attribute: str - Column for ground truth (required)
output_s3_uri: str - S3 URI for outputs (required)
wait: bool - Block until completion (default: True)
logs: bool - Show logs (default: True)
Parameters:
problem_type: str - "BinaryClassification", "MulticlassClassification", or "Regression"
inference_attribute: str - Column name for model predictions
probability_attribute: Optional[str] - Column name for prediction probabilities
ground_truth_attribute: str - Column name for true labels
Notes:
- Requires ground truth labels for comparison
- Ground truth provided separately from captured data
- Metrics depend on problem_type
- Baseline should include diverse scenarios
- Monitor detects model degradation over time
"""Usage:
from sagemaker.core.model_monitor import ModelQualityMonitor, EndpointInput, MonitoringOutput, CronExpressionGenerator
# Create model quality monitor
quality_monitor = ModelQualityMonitor(
role="arn:aws:iam::123456789012:role/SageMakerRole",
instance_count=1,
instance_type="ml.m5.xlarge"
)
# Generate quality baseline with predictions and labels
quality_monitor.suggest_baseline(
baseline_dataset="s3://my-bucket/validation-with-predictions.csv",
dataset_format={"csv": {"header": True}},
problem_type="BinaryClassification",
inference_attribute="prediction",
probability_attribute="probability",
ground_truth_attribute="label",
output_s3_uri="s3://my-bucket/quality-baseline",
wait=True
)
# Create monitoring schedule
# Note: Ground truth must be provided for ongoing monitoring
quality_monitor.create_monitoring_schedule(
monitor_schedule_name="model-quality-monitor",
endpoint_input=EndpointInput(
endpoint_name="my-endpoint",
destination="/opt/ml/processing/input/endpoint",
inference_attribute="prediction",
probability_attribute="probability"
),
ground_truth_input="s3://my-bucket/ground-truth-labels/", # Updated regularly
problem_type="BinaryClassification",
output=MonitoringOutput(
source="/opt/ml/processing/output",
destination="s3://my-bucket/quality-monitoring"
),
constraints="s3://my-bucket/quality-baseline/constraints.json",
statistics="s3://my-bucket/quality-baseline/statistics.json",
schedule_cron_expression=CronExpressionGenerator.daily(),
enable_cloudwatch_metrics=True
)
print("Model quality monitoring active")Ground Truth Label Format:
Ground truth labels must match captured inference data by record ID or timestamp:
inference_id,timestamp,ground_truth_label
abc123,2024-01-15T10:30:00Z,1
def456,2024-01-15T10:31:00Z,0Or for regression:
inference_id,timestamp,ground_truth_value
abc123,2024-01-15T10:30:00Z,42.5
def456,2024-01-15T10:31:00Z,38.2Monitors for bias drift in model predictions.
class ModelBiasMonitor(ModelMonitor):
"""
Model bias monitoring for fairness.
Monitors:
- Bias metrics (DI, DPL, CDDL, DCR, AD, RD, etc.)
- Protected attribute drift
- Fairness metric changes over time
- Bias amplification detection
Methods:
create_monitoring_schedule(monitor_schedule_name, endpoint_input, ground_truth_input,
analysis_config, output, constraints=None,
schedule_cron_expression=None, enable_cloudwatch_metrics=True) -> Dict
Create bias monitoring schedule.
Parameters:
monitor_schedule_name: str - Schedule name (required)
endpoint_input: EndpointInput - Endpoint to monitor (required)
ground_truth_input: str - S3 URI for ground truth (required)
analysis_config: BiasAnalysisConfig - Bias configuration (required)
output: MonitoringOutput - Output configuration (required)
constraints: Optional[str] - Baseline constraints S3 URI
schedule_cron_expression: Optional[str] - Cron schedule
enable_cloudwatch_metrics: bool - Publish metrics (default: True)
Returns:
Dict: Response with schedule ARN
suggest_baseline(baseline_dataset, dataset_format, analysis_config,
output_s3_uri, wait=True, logs=True) -> None
Generate bias baseline.
Parameters:
baseline_dataset: str - S3 URI (required)
dataset_format: Dict - Format specification (required)
analysis_config: BiasAnalysisConfig - Bias config (required)
output_s3_uri: str - S3 URI for outputs (required)
wait: bool - Block until completion (default: True)
logs: bool - Show logs (default: True)
Configuration:
Uses BiasAnalysisConfig for bias settings
Notes:
- Requires ground truth labels
- Monitors post-training bias metrics
- Detects bias drift over time
- Use with protected attributes (gender, race, age, etc.)
- Alerts on fairness metric violations
"""Usage:
from sagemaker.core.model_monitor import ModelBiasMonitor, BiasAnalysisConfig
# Configure bias analysis
bias_config = BiasAnalysisConfig(
label_values_or_threshold=[1], # Positive outcome value
facet_name="gender", # Protected attribute
facet_values_or_threshold=[0], # Reference group (e.g., male=0)
predicted_label_name="prediction"
)
# Create bias monitor
bias_monitor = ModelBiasMonitor(
role="arn:aws:iam::123456789012:role/SageMakerRole",
instance_count=1,
instance_type="ml.m5.xlarge"
)
# Generate baseline
bias_monitor.suggest_baseline(
baseline_dataset="s3://my-bucket/training-data.csv",
dataset_format={"csv": {"header": True}},
analysis_config=bias_config,
output_s3_uri="s3://my-bucket/bias-baseline",
wait=True
)
# Create monitoring schedule
bias_monitor.create_monitoring_schedule(
monitor_schedule_name="bias-drift-monitor",
endpoint_input=EndpointInput(
endpoint_name="my-endpoint",
destination="/opt/ml/processing/input",
features_attribute="features",
inference_attribute="prediction"
),
ground_truth_input="s3://my-bucket/ground-truth/",
analysis_config=bias_config,
output=MonitoringOutput(
source="/opt/ml/processing/output",
destination="s3://my-bucket/bias-monitoring"
),
constraints="s3://my-bucket/bias-baseline/constraints.json",
schedule_cron_expression=CronExpressionGenerator.daily()
)
# Monitor alerts if bias metrics exceed baseline thresholdsMonitors feature attribution and model explainability over time.
class ModelExplainabilityMonitor(ModelMonitor):
"""
Model explainability monitoring with SHAP.
Monitors:
- Feature importance drift
- SHAP value distribution changes
- Attribution stability over time
- Explanation consistency
Methods:
create_monitoring_schedule(monitor_schedule_name, endpoint_input, analysis_config,
output, constraints=None, schedule_cron_expression=None,
enable_cloudwatch_metrics=True) -> Dict
Create explainability monitoring schedule.
Parameters:
monitor_schedule_name: str - Schedule name (required)
endpoint_input: EndpointInput - Endpoint to monitor (required)
analysis_config: ExplainabilityAnalysisConfig - SHAP config (required)
output: MonitoringOutput - Output configuration (required)
constraints: Optional[str] - Baseline constraints S3 URI
schedule_cron_expression: Optional[str] - Cron schedule
enable_cloudwatch_metrics: bool - Publish metrics (default: True)
Returns:
Dict: Response with schedule ARN
suggest_baseline(baseline_dataset, dataset_format, analysis_config,
output_s3_uri, wait=True, logs=True) -> None
Generate explainability baseline.
Parameters:
baseline_dataset: str - S3 URI (required)
dataset_format: Dict - Format specification (required)
analysis_config: ExplainabilityAnalysisConfig - Config (required)
output_s3_uri: str - S3 URI for outputs (required)
wait: bool - Block until completion (default: True)
logs: bool - Show logs (default: True)
Configuration:
Uses ExplainabilityAnalysisConfig for SHAP settings
Notes:
- Monitors feature importance changes
- Detects explanation drift
- More computationally expensive than data quality monitoring
- Requires model that supports SHAP
"""Usage:
from sagemaker.core.model_monitor import ModelExplainabilityMonitor, ExplainabilityAnalysisConfig
from sagemaker.core.clarify import SHAPConfig, ClarifyShapBaselineConfig
# Configure explainability analysis
baseline_config = ClarifyShapBaselineConfig(
mime_type="text/csv",
shap_baseline="0.5,0.3,0.2,0.1" # Baseline feature values
)
shap_config = SHAPConfig(
baseline=baseline_config,
num_samples=100,
agg_method="mean_abs",
save_local_shap_values=True
)
explainability_config = ExplainabilityAnalysisConfig(
explainability_config=shap_config,
model_config={
"model_name": "my-model",
"instance_type": "ml.m5.xlarge",
"initial_instance_count": 1,
"content_type": "text/csv",
"accept_type": "application/json"
}
)
# Create explainability monitor
explainability_monitor = ModelExplainabilityMonitor(
role="arn:aws:iam::123456789012:role/SageMakerRole",
instance_count=1,
instance_type="ml.m5.xlarge"
)
# Generate baseline
explainability_monitor.suggest_baseline(
baseline_dataset="s3://bucket/baseline.csv",
dataset_format={"csv": {"header": False}},
analysis_config=explainability_config,
output_s3_uri="s3://bucket/explainability-baseline",
wait=True
)
# Create monitoring schedule
explainability_monitor.create_monitoring_schedule(
monitor_schedule_name="explainability-drift-monitor",
endpoint_input=EndpointInput(
endpoint_name="my-endpoint",
destination="/opt/ml/processing/input"
),
analysis_config=explainability_config,
output=MonitoringOutput(
source="/opt/ml/processing/output",
destination="s3://bucket/explainability-monitoring"
),
constraints="s3://bucket/explainability-baseline/constraints.json",
schedule_cron_expression=CronExpressionGenerator.daily()
)class DataCaptureConfig:
"""
Data capture configuration for endpoints.
Parameters:
enable_capture: bool - Enable data capture (default: False)
sampling_percentage: int - Percentage of requests to capture (default: 100)
- Range: 0-100
- Lower percentage reduces storage costs
destination_s3_uri: str - S3 URI for captured data (required)
kms_key_id: Optional[str] - KMS key for encryption
capture_options: List[str] - What to capture (default: ["REQUEST", "RESPONSE"])
- Options: ["REQUEST"], ["RESPONSE"], or ["REQUEST", "RESPONSE"]
csv_content_types: Optional[List[str]] - CSV content types
- Example: ["text/csv", "application/csv"]
json_content_types: Optional[List[str]] - JSON content types
- Example: ["application/json", "application/jsonlines"]
Notes:
- Attach to endpoint at deployment time
- Cannot be added after endpoint creation (requires update)
- Captured data used by all monitor types
- Storage costs: plan for data retention
- Default retention: data never deleted (manage with S3 lifecycle)
"""class EndpointInput:
"""
Endpoint input configuration for monitoring.
Parameters:
endpoint_name: str - Endpoint name to monitor (required)
destination: str - Container path for data (required)
- Example: "/opt/ml/processing/input/endpoint"
s3_input_mode: str - Input mode (default: "File")
- "File" or "Pipe"
s3_data_distribution_type: str - Distribution type (default: "FullyReplicated")
features_attribute: Optional[str] - Features attribute name
- For JSON data: JSONPath to features
inference_attribute: Optional[str] - Inference attribute name
- Column/field with model predictions
probability_attribute: Optional[str] - Probability attribute name
- Column/field with prediction probabilities
Notes:
- Endpoint must have data capture enabled
- Data automatically pulled from S3 capture location
- Attributes specify how to parse captured data
"""class MonitoringOutput:
"""
Monitoring output configuration.
Parameters:
source: str - Container output path (required)
- Example: "/opt/ml/processing/output"
destination: str - S3 URI for monitoring results (required)
s3_upload_mode: str - Upload mode (default: "EndOfJob")
- "EndOfJob": Upload after monitoring completes
- "Continuous": Upload during monitoring
Notes:
- Results include:
- violations.json: Detected violations
- statistics.json: Current statistics
- constraint_violations.json: Detailed violations
- Continuous mode for large outputs
"""class CronExpressionGenerator:
"""
Generate cron expressions for monitoring schedules.
Class Methods:
hourly() -> str
Every hour at minute 0.
Returns: "cron(0 * ? * * *)"
daily() -> str
Daily at midnight UTC.
Returns: "cron(0 0 ? * * *)"
daily_every_x_hours(hour) -> str
Daily every X hours.
Parameters:
hour: int - Hour interval (1-24)
Returns:
str: Cron expression
now() -> str
Immediate (one-time execution).
Returns: Current timestamp
Custom Expression:
Use standard cron format: "cron(minute hour day month day-of-week year)"
- minute: 0-59
- hour: 0-23 (UTC)
- day: 1-31 or ?
- month: 1-12 or ?
- day-of-week: 1-7 or ?
- year: * or specific year
Examples:
- Hourly: "cron(0 * ? * * *)"
- Daily at 2 AM UTC: "cron(0 2 ? * * *)"
- Every 6 hours: "cron(0 */6 ? * * *)"
- Weekdays at noon: "cron(0 12 ? * MON-FRI *)"
Notes:
- All times in UTC
- Minimum frequency: hourly
- ? means "no specific value" (use for day or day-of-week)
"""class MonitoringExecution:
"""
Monitoring execution details.
Methods:
wait(logs=True) -> None
Wait for execution to complete.
Parameters:
logs: bool - Show CloudWatch logs (default: True)
Raises:
WaiterError: If execution fails
describe() -> Dict
Get execution details.
Returns:
Dict: Complete execution description
Attributes:
monitoring_schedule_name: str - Parent schedule name
scheduled_time: datetime - Scheduled execution time
creation_time: datetime - Actual creation time
last_modified_time: datetime - Last modification time
monitoring_execution_status: str - Status
- "Pending", "InProgress", "Completed", "CompletedWithViolations", "Failed", "Stopped"
processing_job_arn: str - Processing job ARN
endpoint_name: str - Monitored endpoint name
failure_reason: Optional[str] - Failure reason if failed
violation_report_uri: Optional[str] - S3 URI for violations report
Notes:
- CompletedWithViolations means monitoring ran but found issues
- Check violation_report_uri for details
- Failed means monitoring job itself failed
"""class MonitoringAlertActions:
"""
Actions to take on monitoring alerts.
Methods:
add_sns_topic(topic_arn) -> None
Add SNS notification.
Parameters:
topic_arn: str - SNS topic ARN
add_lambda(function_arn) -> None
Add Lambda trigger.
Parameters:
function_arn: str - Lambda function ARN
add_cloudwatch_alarm(alarm_name) -> None
Add CloudWatch alarm.
Parameters:
alarm_name: str - Alarm name
Notes:
- Multiple actions can be configured
- SNS: send notifications to email, SMS, etc.
- Lambda: trigger custom response logic
- CloudWatch: integrate with existing alarm infrastructure
"""Usage:
from sagemaker.core.model_monitor import MonitoringAlertActions
# Configure alerts
alert_actions = MonitoringAlertActions()
# Email notifications
alert_actions.add_sns_topic("arn:aws:sns:us-west-2:123:ml-team-alerts")
# Custom handling with Lambda
alert_actions.add_lambda("arn:aws:lambda:us-west-2:123:function:handle-violations")
# CloudWatch alarm integration
alert_actions.add_cloudwatch_alarm("model-quality-alarm")
# Create schedule with alerts
monitor.create_monitoring_schedule(
monitor_schedule_name="monitored-endpoint-with-alerts",
endpoint_input=endpoint_input,
output=output,
schedule_cron_expression=CronExpressionGenerator.hourly(),
alert_actions=alert_actions
)
# Alerts triggered when violations detectedSNS Topic Example:
import boto3
# Create SNS topic for alerts
sns = boto3.client('sns')
topic_response = sns.create_topic(Name='sagemaker-monitoring-alerts')
topic_arn = topic_response['TopicArn']
# Subscribe email
sns.subscribe(
TopicArn=topic_arn,
Protocol='email',
Endpoint='ml-team@company.com'
)
# Use in monitoring
alert_actions = MonitoringAlertActions()
alert_actions.add_sns_topic(topic_arn)Lambda Handler Example:
# lambda_handler.py
import json
import boto3
def lambda_handler(event, context):
"""
Handle monitoring violations.
Event structure:
{
"monitoringScheduleName": "...",
"endpointName": "...",
"violationReport": "s3://...",
"timestamp": "..."
}
"""
schedule_name = event['monitoringScheduleName']
endpoint = event['endpointName']
violations_uri = event['violationReport']
# Download violations
s3 = boto3.client('s3')
# Parse S3 URI and download
# Take action based on severity
if is_critical(violations):
# Disable endpoint or alert on-call
send_page_alert(endpoint, violations)
else:
# Log for investigation
log_violation(endpoint, violations)
return {'statusCode': 200}class ModelDashboardIndicatorAction:
"""
Dashboard indicator actions for model monitoring.
Parameters:
enabled: bool - Enable dashboard indicators (required)
Notes:
- Integrates with SageMaker Model Dashboard
- Shows monitoring status visually
- Enables at-a-glance health checks
"""from sagemaker.core.model_monitor import (
DefaultModelMonitor,
ModelQualityMonitor,
ModelBiasMonitor,
ModelExplainabilityMonitor,
CronExpressionGenerator
)
# Deploy endpoint with data capture
endpoint = builder.deploy(
endpoint_name="fully-monitored-endpoint",
data_capture_config=DataCaptureConfig(
enable_capture=True,
sampling_percentage=100,
destination_s3_uri="s3://bucket/data-capture"
)
)
# Setup all monitoring types
monitors = [
(DefaultModelMonitor(role=role), "data-quality", CronExpressionGenerator.hourly(), None),
(ModelQualityMonitor(role=role), "model-quality", CronExpressionGenerator.daily(), problem_type_config),
(ModelBiasMonitor(role=role), "bias-drift", CronExpressionGenerator.daily(), bias_config),
(ModelExplainabilityMonitor(role=role), "explainability", CronExpressionGenerator.daily(), explainability_config)
]
# Create all schedules
for monitor, name, schedule, config in monitors:
# Generate baselines first
baseline_uri = f"s3://bucket/baselines/{name}"
monitor.suggest_baseline(
baseline_dataset="s3://bucket/training-data.csv",
dataset_format={"csv": {"header": True}},
output_s3_uri=baseline_uri,
**({"analysis_config": config} if config else {}),
wait=True
)
# Create schedule
schedule_config = {
"monitor_schedule_name": f"{endpoint.endpoint_name}-{name}",
"endpoint_input": EndpointInput(endpoint_name=endpoint.endpoint_name),
"output": MonitoringOutput(
source="/opt/ml/processing/output",
destination=f"s3://bucket/monitoring/{name}"
),
"constraints": f"{baseline_uri}/constraints.json",
"statistics": f"{baseline_uri}/statistics.json",
"schedule_cron_expression": schedule,
"enable_cloudwatch_metrics": True
}
# Add type-specific config
if name == "model-quality":
schedule_config["problem_type"] = "BinaryClassification"
schedule_config["ground_truth_input"] = "s3://bucket/ground-truth"
elif config:
schedule_config["analysis_config"] = config
monitor.create_monitoring_schedule(**schedule_config)
print("All monitoring types active")from sagemaker.core.model_monitor import BatchTransformInput
# Monitor batch transform instead of real-time endpoint
batch_input = BatchTransformInput(
data_captured_destination_s3_uri="s3://bucket/batch-capture",
destination="/opt/ml/processing/input",
dataset_format={"csv": {"header": False}},
s3_input_mode="File",
s3_data_distribution_type="FullyReplicated"
)
monitor.create_monitoring_schedule(
monitor_schedule_name="batch-transform-monitor",
batch_transform_input=batch_input, # Instead of endpoint_input
output=output,
statistics="s3://bucket/baseline/statistics.json",
constraints="s3://bucket/baseline/constraints.json",
schedule_cron_expression=CronExpressionGenerator.daily()
)# Use custom monitoring logic
custom_monitor = ModelMonitor(
role=role,
image_uri="123456789012.dkr.ecr.us-west-2.amazonaws.com/custom-monitor:latest",
instance_type="ml.m5.xlarge",
env={
"CUSTOM_THRESHOLD": "0.05",
"ALERT_EMAIL": "team@company.com"
}
)
# Custom container must:
# 1. Read captured data from /opt/ml/processing/input
# 2. Compare against baseline
# 3. Write violations to /opt/ml/processing/output/violations.json
# 4. Exit with code 0 (success) or non-zero (failure)
custom_monitor.create_monitoring_schedule(
monitor_schedule_name="custom-monitor",
endpoint_input=endpoint_input,
output=output,
schedule_cron_expression=CronExpressionGenerator.hourly()
)import boto3
# Create CloudWatch alarm on monitoring metrics
cloudwatch = boto3.client('cloudwatch')
# Alarm on data drift
cloudwatch.put_metric_alarm(
AlarmName='model-data-drift',
ComparisonOperator='GreaterThanThreshold',
EvaluationPeriods=1,
MetricName='feature_baseline_drift_distance',
Namespace='aws/sagemaker/Endpoints/data-metrics',
Period=3600, # 1 hour
Statistic='Average',
Threshold=0.1, # Alert if drift > 0.1
ActionsEnabled=True,
AlarmActions=['arn:aws:sns:us-west-2:123:alerts'],
AlarmDescription='Alert on significant data drift',
Dimensions=[
{'Name': 'Endpoint', 'Value': 'my-endpoint'},
{'Name': 'MonitoringSchedule', 'Value': 'data-quality-monitor'}
]
)No Data Captured:
Baseline Generation Failed:
Ground Truth Not Found:
Monitoring Execution Failed:
Too Many Violations:
Schedule Not Triggering: