Python CloudWatch Logging handler that integrates Python logging system with Amazon Web Services CloudWatch Logs
npx @tessl/cli install tessl/pypi-watchtower@3.4.0A lightweight Python logging handler that integrates the Python logging system with Amazon Web Services CloudWatch Logs. Watchtower provides seamless log streaming to CloudWatch without requiring system-wide log collectors, using the boto3 AWS SDK for efficient log transmission with batching, delivery guarantees, and built-in filtering to prevent infinite logging loops from AWS SDK dependencies.
pip install watchtowerimport watchtowerCommon pattern for logging setup:
from watchtower import CloudWatchLogHandler, CloudWatchLogFormatter, WatchtowerError, WatchtowerWarningFor type annotations:
from typing import Any, Callable, Dict, List, Optional, Tuple
import botocore.client
import loggingimport watchtower
import logging
# Basic setup
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
logger.addHandler(watchtower.CloudWatchLogHandler())
# Simple logging
logger.info("Hi")
logger.error("Something went wrong")
# Structured logging with JSON
logger.info(dict(foo="bar", details={"status": "success", "count": 42}))import boto3
import watchtower
# Custom configuration
handler = watchtower.CloudWatchLogHandler(
log_group_name="my-application",
log_stream_name="{machine_name}/{program_name}/{logger_name}",
use_queues=True,
send_interval=30,
max_batch_size=512 * 1024,
boto3_client=boto3.client("logs", region_name="us-west-2"),
create_log_group=True,
log_group_tags={"Environment": "production", "Service": "web-api"}
)
logger = logging.getLogger("myapp")
logger.addHandler(handler)
logger.info("Application started")Watchtower uses a thread-based queuing system for efficient log delivery:
The main handler class that integrates with Python's logging system to send logs to AWS CloudWatch Logs. Provides comprehensive configuration options for log grouping, streaming, batching, and AWS authentication.
Class Constants:
END: int = 1 # Signal for thread termination
FLUSH: int = 2 # Signal for immediate flush
FLUSH_TIMEOUT: int = 30 # Timeout for flush operations
EXTRA_MSG_PAYLOAD_SIZE: int = 26 # Extra metadata size per messageclass CloudWatchLogHandler(logging.Handler):
def __init__(
self,
log_group_name: str = __name__,
log_stream_name: str = "{machine_name}/{program_name}/{logger_name}/{process_id}",
use_queues: bool = True,
send_interval: int = 60,
max_batch_size: int = 1024 * 1024,
max_batch_count: int = 10000,
boto3_client: Optional[botocore.client.BaseClient] = None,
boto3_profile_name: Optional[str] = None,
create_log_group: bool = True,
log_group_tags: Dict[str, str] = {},
json_serialize_default: Optional[Callable] = None,
log_group_retention_days: Optional[int] = None,
create_log_stream: bool = True,
max_message_size: int = 256 * 1024,
log_group: Optional[str] = None, # deprecated
stream_name: Optional[str] = None, # deprecated
*args,
**kwargs
):
"""
Create a CloudWatch log handler.
Parameters:
- log_group_name: CloudWatch log group name
- log_stream_name: Log stream name template with format placeholders
- use_queues: Enable message queuing and batching (recommended: True)
- send_interval: Maximum seconds to hold messages before sending
- max_batch_size: Maximum batch size in bytes (AWS limit: 1,048,576)
- max_batch_count: Maximum messages per batch (AWS limit: 10,000)
- boto3_client: Custom boto3 logs client (botocore.client.BaseClient) for authentication/region
- boto3_profile_name: AWS profile name for authentication
- create_log_group: Auto-create log group if it doesn't exist
- log_group_tags: Dictionary of tags to apply to log group
- json_serialize_default: Custom JSON serialization function
- log_group_retention_days: Log retention period in days
- create_log_stream: Auto-create log stream if it doesn't exist
- max_message_size: Maximum message size in bytes (default: 256KB)
- log_group: (deprecated) Use log_group_name instead
- stream_name: (deprecated) Use log_stream_name instead
"""
def emit(self, record: logging.LogRecord) -> None:
"""Send a log record to CloudWatch Logs."""
def flush(self) -> None:
"""Send any queued messages to CloudWatch immediately."""
def close(self) -> None:
"""Send queued messages and prevent further processing."""
def __repr__(self) -> str:
"""Return string representation of the handler."""Specialized formatter that handles JSON serialization for CloudWatch Logs, enabling structured logging with automatic recognition and indexing by CloudWatch.
class CloudWatchLogFormatter(logging.Formatter):
add_log_record_attrs: Tuple[str, ...] = tuple()
def __init__(
self,
*args,
json_serialize_default: Optional[Callable] = None,
add_log_record_attrs: Optional[Tuple[str, ...]] = None,
**kwargs
):
"""
Create a CloudWatch log formatter.
Parameters:
- json_serialize_default: Custom JSON serialization function for objects
- add_log_record_attrs: Tuple of LogRecord attributes to include in messages
"""
def format(self, record: logging.LogRecord) -> str:
"""
Format log record for CloudWatch, handling JSON serialization.
Parameters:
- record: LogRecord instance to format
Returns:
str: Formatted log message, JSON string for dict messages
"""Custom exception and warning classes for watchtower-specific error handling.
class WatchtowerError(Exception):
"""Default exception class for watchtower module errors."""
class WatchtowerWarning(UserWarning):
"""Default warning class for watchtower module warnings."""DEFAULT_LOG_STREAM_NAME: str = "{machine_name}/{program_name}/{logger_name}/{process_id}"
def _json_serialize_default(o: Any) -> str:
"""
Standard JSON serializer function for CloudWatch log messages.
Serializes datetime objects using .isoformat() method,
and all other objects using repr().
Parameters:
- o: Object to serialize
Returns:
str: Serialized representation
"""Default log stream name template with format placeholders for:
{machine_name}: Platform hostname{program_name}: Program name from sys.argv[0]{logger_name}: Logger name{process_id}: Process ID{thread_name}: Thread name{strftime:%format}: UTC datetime formattingimport watchtower
import flask
import logging
logging.basicConfig(level=logging.INFO)
app = flask.Flask("myapp")
handler = watchtower.CloudWatchLogHandler(log_group_name=app.name)
app.logger.addHandler(handler)
logging.getLogger("werkzeug").addHandler(handler)
@app.route('/')
def hello():
app.logger.info("Request received", extra={"user_agent": request.headers.get("User-Agent")})
return 'Hello World!'# In settings.py
import boto3
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'handlers': {
'watchtower': {
'class': 'watchtower.CloudWatchLogHandler',
'log_group_name': 'django-app',
'boto3_client': boto3.client('logs', region_name='us-east-1'),
'create_log_group': True,
},
},
'loggers': {
'django': {
'handlers': ['watchtower'],
'level': 'INFO',
},
},
}import watchtower
import logging
from datetime import datetime
# Configure handler with LogRecord attributes
handler = watchtower.CloudWatchLogHandler()
handler.formatter.add_log_record_attrs = ["levelname", "filename", "process", "thread"]
logger = logging.getLogger("myapp")
logger.addHandler(handler)
# Log structured data
logger.critical({
"event": "user_login",
"user_id": "12345",
"timestamp": datetime.utcnow(),
"metadata": {
"ip_address": "192.168.1.1",
"user_agent": "Mozilla/5.0...",
"session_id": "abc123"
}
})import watchtower
import logging
from datetime import datetime
from decimal import Decimal
def custom_serializer(obj):
"""Custom JSON serializer for special types."""
if isinstance(obj, Decimal):
return float(obj)
elif isinstance(obj, datetime):
return obj.isoformat()
return str(obj)
handler = watchtower.CloudWatchLogHandler(
json_serialize_default=custom_serializer
)
logger = logging.getLogger("myapp")
logger.addHandler(handler)
# Log with custom types
logger.info({
"price": Decimal("19.99"),
"timestamp": datetime.utcnow(),
"status": "success"
})import watchtower
import logging
handler = watchtower.CloudWatchLogHandler()
logger = logging.getLogger("myapp")
logger.addHandler(handler)
try:
logger.info("Starting process")
# ... application logic ...
logger.info("Process completed")
finally:
# Ensure all logs are sent before exit
handler.flush() # Send queued messages
handler.close() # Send final messages and shutdown# Empty message warning
logger.info("") # Triggers WatchtowerWarning
# Message after shutdown warning
handler.close()
logger.info("This triggers a warning") # WatchtowerWarning
# Oversized message warning
logger.info("x" * (300 * 1024)) # Message truncated, WatchtowerWarningimport watchtower
import boto3
from botocore.exceptions import NoCredentialsError, ClientError
try:
handler = watchtower.CloudWatchLogHandler(
boto3_client=boto3.client("logs", region_name="us-west-2")
)
except NoCredentialsError:
print("AWS credentials not configured")
except ClientError as e:
print(f"AWS API error: {e}")import watchtower
try:
# This raises WatchtowerError
handler = watchtower.CloudWatchLogHandler(
boto3_client=boto3.client("logs"),
boto3_profile_name="myprofile" # Can't specify both
)
except watchtower.WatchtowerError as e:
print(f"Configuration error: {e}")