Low-level, data-driven core of boto 3 providing foundational AWS service access.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Resource state waiters that poll AWS services until resources reach desired states, with configurable polling intervals and timeout handling. Waiters provide a reliable mechanism to wait for asynchronous operations to complete, such as EC2 instances becoming available or DynamoDB tables reaching active status.
Access waiters through AWS service clients to wait for resource state changes.
class BaseClient:
def get_waiter(self, waiter_name: str) -> Waiter:
"""
Get a waiter instance for polling resource states.
Args:
waiter_name: Name of the waiter (e.g., 'instance_running', 'bucket_exists')
Returns:
Waiter: Configured waiter instance for the specified resource state
Raises:
ValueError: If waiter_name is not available for the service
"""
@property
def waiter_names(self) -> List[str]:
"""
List of available waiter names for this service.
Returns:
List[str]: Available waiter names
"""Primary waiter implementation that handles polling and state transitions.
class Waiter:
def __init__(
self,
name: str,
config: SingleWaiterConfig,
operation_method: callable
):
"""
Initialize a waiter instance.
Args:
name: Waiter name identifier
config: Waiter configuration and acceptor rules
operation_method: AWS operation method to call for polling
"""
@property
def name(self) -> str:
"""
Waiter name identifier.
Returns:
str: The name of this waiter
"""
@property
def config(self) -> SingleWaiterConfig:
"""
Waiter configuration including polling settings and acceptors.
Returns:
SingleWaiterConfig: Configuration object with delay, max_attempts, and acceptors
"""
def wait(self, **kwargs) -> None:
"""
Wait for the resource to reach the desired state.
Args:
**kwargs: Parameters to pass to the underlying AWS operation
WaiterConfig: Optional waiter configuration overrides
Raises:
WaiterError: If waiter times out, encounters failure state, or receives errors
"""Configure waiter behavior with custom timing and retry settings.
class WaiterModel:
SUPPORTED_VERSION: int = 2
def __init__(self, waiter_config: dict):
"""
Initialize waiter model from service configuration.
Args:
waiter_config: Loaded waiter configuration from service model
Raises:
WaiterConfigError: If waiter version is unsupported
"""
@property
def version(self) -> int:
"""
Waiter configuration format version.
Returns:
int: Configuration version (always 2 for supported models)
"""
@property
def waiter_names(self) -> List[str]:
"""
Available waiter names in this model.
Returns:
List[str]: Sorted list of waiter names
"""
def get_waiter(self, waiter_name: str) -> SingleWaiterConfig:
"""
Get configuration for a specific waiter.
Args:
waiter_name: Name of the waiter to retrieve
Returns:
SingleWaiterConfig: Waiter configuration object
Raises:
ValueError: If waiter_name does not exist
"""
class SingleWaiterConfig:
def __init__(self, single_waiter_config: dict):
"""
Initialize single waiter configuration.
Args:
single_waiter_config: Configuration dictionary for one waiter
"""
@property
def description(self) -> str:
"""
Human-readable waiter description.
Returns:
str: Waiter description text
"""
@property
def operation(self) -> str:
"""
AWS operation name used for polling.
Returns:
str: Operation name (e.g., 'DescribeInstances', 'HeadBucket')
"""
@property
def delay(self) -> int:
"""
Default delay between polling attempts in seconds.
Returns:
int: Delay in seconds
"""
@property
def max_attempts(self) -> int:
"""
Maximum number of polling attempts before timeout.
Returns:
int: Maximum attempts count
"""
@property
def acceptors(self) -> List[AcceptorConfig]:
"""
List of acceptor rules that determine state transitions.
Returns:
List[AcceptorConfig]: Acceptor configurations
"""
class AcceptorConfig:
def __init__(self, config: dict):
"""
Initialize acceptor configuration.
Args:
config: Acceptor configuration dictionary
"""
@property
def state(self) -> str:
"""
Target state for this acceptor ('success', 'failure', 'retry').
Returns:
str: State transition target
"""
@property
def matcher(self) -> str:
"""
Matcher type ('path', 'pathAll', 'pathAny', 'status', 'error').
Returns:
str: Matcher type identifier
"""
@property
def expected(self) -> Any:
"""
Expected value for successful match.
Returns:
Any: Expected value (string, int, bool, etc.)
"""
@property
def argument(self) -> str:
"""
JMESPath expression or matcher argument.
Returns:
str: JMESPath expression for path matchers
"""
@property
def explanation(self) -> str:
"""
Human-readable explanation of what this acceptor matches.
Returns:
str: Explanation text for debugging
"""
@property
def matcher_func(self) -> callable:
"""
Compiled matcher function that evaluates responses.
Returns:
callable: Function that takes response dict and returns bool
"""Wait for AWS resources to reach desired states using service-specific waiters.
from botocore.session import get_session
# Create session and clients
session = get_session()
ec2_client = session.create_client('ec2', region_name='us-east-1')
s3_client = session.create_client('s3', region_name='us-east-1')
# Wait for EC2 instance to be running
instance_id = 'i-1234567890abcdef0'
waiter = ec2_client.get_waiter('instance_running')
waiter.wait(InstanceIds=[instance_id])
print(f"Instance {instance_id} is now running")
# Wait for S3 bucket to exist
bucket_name = 'my-example-bucket'
waiter = s3_client.get_waiter('bucket_exists')
waiter.wait(Bucket=bucket_name)
print(f"Bucket {bucket_name} exists and is accessible")Override default polling behavior with custom timing settings.
# Wait with custom configuration
waiter = ec2_client.get_waiter('instance_stopped')
waiter.wait(
InstanceIds=['i-1234567890abcdef0'],
WaiterConfig={
'Delay': 10, # Poll every 10 seconds instead of default
'MaxAttempts': 60 # Try up to 60 times instead of default
}
)Wait for DynamoDB table operations to complete.
dynamodb_client = session.create_client('dynamodb', region_name='us-east-1')
# Create table and wait for it to become active
dynamodb_client.create_table(
TableName='MyTable',
KeySchema=[
{'AttributeName': 'id', 'KeyType': 'HASH'}
],
AttributeDefinitions=[
{'AttributeName': 'id', 'AttributeType': 'S'}
],
BillingMode='PAY_PER_REQUEST'
)
# Wait for table to be ready
waiter = dynamodb_client.get_waiter('table_exists')
waiter.wait(TableName='MyTable')
print("Table is now active and ready to use")
# Delete table and wait for deletion to complete
dynamodb_client.delete_table(TableName='MyTable')
waiter = dynamodb_client.get_waiter('table_not_exists')
waiter.wait(TableName='MyTable')
print("Table has been successfully deleted")Monitor CloudFormation stack operations until completion.
cloudformation_client = session.create_client('cloudformation', region_name='us-east-1')
# Create stack and wait for completion
cloudformation_client.create_stack(
StackName='my-stack',
TemplateBody=template_content,
Parameters=[
{'ParameterKey': 'Environment', 'ParameterValue': 'production'}
]
)
# Wait for stack creation to complete
waiter = cloudformation_client.get_waiter('stack_create_complete')
waiter.wait(StackName='my-stack')
print("Stack creation completed successfully")
# Update stack and wait for update completion
cloudformation_client.update_stack(
StackName='my-stack',
TemplateBody=updated_template_content
)
waiter = cloudformation_client.get_waiter('stack_update_complete')
waiter.wait(StackName='my-stack')
print("Stack update completed successfully")Find out which waiters are available for a service.
# List all available waiters for a service
ec2_waiters = ec2_client.waiter_names
print(f"EC2 waiters: {ec2_waiters}")
s3_waiters = s3_client.waiter_names
print(f"S3 waiters: {s3_waiters}")
# Check if specific waiter exists
if 'instance_running' in ec2_client.waiter_names:
waiter = ec2_client.get_waiter('instance_running')
print(f"Waiter config: delay={waiter.config.delay}s, max_attempts={waiter.config.max_attempts}")Handle waiter timeouts and failure conditions properly.
from botocore.exceptions import WaiterError
try:
waiter = ec2_client.get_waiter('instance_running')
waiter.wait(
InstanceIds=['i-1234567890abcdef0'],
WaiterConfig={'MaxAttempts': 10}
)
except WaiterError as e:
print(f"Waiter failed: {e.reason}")
print(f"Last response: {e.last_response}")
# Handle specific failure cases
if 'Max attempts exceeded' in e.reason:
print("Instance took too long to start")
elif 'terminal failure state' in e.reason:
print("Instance failed to start properly")Wait for Lambda function states and configurations.
lambda_client = session.create_client('lambda', region_name='us-east-1')
# Wait for function to be active after creation
waiter = lambda_client.get_waiter('function_active')
waiter.wait(FunctionName='my-function')
print("Lambda function is active")
# Wait for function configuration update to complete
lambda_client.update_function_configuration(
FunctionName='my-function',
Runtime='python3.9',
Handler='lambda_function.lambda_handler'
)
waiter = lambda_client.get_waiter('function_updated')
waiter.wait(FunctionName='my-function')
print("Function configuration update completed")instance_exists: Instance appears in describe_instancesinstance_running: Instance reaches 'running' stateinstance_stopped: Instance reaches 'stopped' stateinstance_terminated: Instance reaches 'terminated' stateinstance_status_ok: Instance passes status checksbucket_exists: Bucket is accessible via HEAD requestbucket_not_exists: Bucket no longer exists or is inaccessibleobject_exists: Object exists in bucketobject_not_exists: Object no longer exists in bucketdb_instance_available: Database instance is availabledb_instance_deleted: Database instance has been deleteddb_snapshot_available: Database snapshot is availabledb_snapshot_deleted: Database snapshot has been deletedany_instance_in_service: At least one instance is in serviceinstance_deregistered: Instance is no longer registeredinstance_in_service: Specific instance is in serviceControl polling behavior through waiter configuration parameters.
waiter_config = {
'Delay': 5, # Seconds to wait between attempts
'MaxAttempts': 40 # Maximum number of polling attempts
}
waiter.wait(ResourceId='resource-123', WaiterConfig=waiter_config)Waiters use different matcher types to evaluate responses:
Create custom waiters using the waiter model system.
from botocore.waiter import WaiterModel, create_waiter_with_client
# Define custom waiter configuration
waiter_config = {
'version': 2,
'waiters': {
'CustomResourceReady': {
'delay': 10,
'maxAttempts': 30,
'operation': 'DescribeCustomResource',
'acceptors': [
{
'matcher': 'path',
'expected': 'READY',
'argument': 'ResourceStatus',
'state': 'success'
},
{
'matcher': 'path',
'expected': 'FAILED',
'argument': 'ResourceStatus',
'state': 'failure'
}
]
}
}
}
# Create waiter model and waiter instance
waiter_model = WaiterModel(waiter_config)
custom_waiter = create_waiter_with_client('CustomResourceReady', waiter_model, client)
# Use custom waiter
custom_waiter.wait(ResourceId='resource-123')Always configure appropriate timeouts for your use case.
# For long-running operations, increase max attempts
waiter.wait(
ResourceId='large-resource',
WaiterConfig={
'Delay': 30, # Check every 30 seconds
'MaxAttempts': 120 # Wait up to 1 hour (30s × 120 = 3600s)
}
)Implement proper error handling and retry logic.
import time
from botocore.exceptions import WaiterError
def wait_with_retry(waiter, max_retries=3, **kwargs):
"""Wait with exponential backoff retry on failures."""
for attempt in range(max_retries + 1):
try:
waiter.wait(**kwargs)
return True
except WaiterError as e:
if attempt == max_retries:
raise
if 'Max attempts exceeded' in e.reason:
# Exponential backoff before retry
delay = 2 ** attempt * 60 # 1min, 2min, 4min
time.sleep(delay)
else:
# Don't retry on terminal failures
raise
return FalseUse waiters to ensure proper resource cleanup.
def cleanup_resources(ec2_client, instance_ids):
"""Safely terminate instances and wait for cleanup."""
# Terminate instances
ec2_client.terminate_instances(InstanceIds=instance_ids)
# Wait for termination to complete
waiter = ec2_client.get_waiter('instance_terminated')
waiter.wait(InstanceIds=instance_ids)
print(f"All instances {instance_ids} have been terminated")Combine waiters with logging to monitor long-running operations.
import logging
logger = logging.getLogger(__name__)
def wait_with_progress(waiter, operation_name, **kwargs):
"""Wait with progress logging."""
config = kwargs.get('WaiterConfig', {})
max_attempts = config.get('MaxAttempts', waiter.config.max_attempts)
delay = config.get('Delay', waiter.config.delay)
logger.info(f"Starting {operation_name} - will check every {delay}s for up to {max_attempts} attempts")
try:
waiter.wait(**kwargs)
logger.info(f"{operation_name} completed successfully")
except WaiterError as e:
logger.error(f"{operation_name} failed: {e.reason}")
raiseInstall with Tessl CLI
npx tessl i tessl/pypi-botocore