or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.md
tile.json

tessl/pypi-watchtower

Python CloudWatch Logging handler that integrates Python logging system with Amazon Web Services CloudWatch Logs

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/watchtower@3.4.x

To install, run

npx @tessl/cli install tessl/pypi-watchtower@3.4.0

index.mddocs/

Watchtower

A lightweight Python logging handler that integrates the Python logging system with Amazon Web Services CloudWatch Logs. Watchtower provides seamless log streaming to CloudWatch without requiring system-wide log collectors, using the boto3 AWS SDK for efficient log transmission with batching, delivery guarantees, and built-in filtering to prevent infinite logging loops from AWS SDK dependencies.

Package Information

  • Package Name: watchtower
  • Language: Python
  • Installation: pip install watchtower
  • Dependencies: boto3 >= 1.9.253, < 2

Core Imports

import watchtower

Common pattern for logging setup:

from watchtower import CloudWatchLogHandler, CloudWatchLogFormatter, WatchtowerError, WatchtowerWarning

For type annotations:

from typing import Any, Callable, Dict, List, Optional, Tuple
import botocore.client
import logging

Basic Usage

import watchtower
import logging

# Basic setup
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
logger.addHandler(watchtower.CloudWatchLogHandler())

# Simple logging
logger.info("Hi")
logger.error("Something went wrong")

# Structured logging with JSON
logger.info(dict(foo="bar", details={"status": "success", "count": 42}))

Advanced Configuration

import boto3
import watchtower

# Custom configuration
handler = watchtower.CloudWatchLogHandler(
    log_group_name="my-application",
    log_stream_name="{machine_name}/{program_name}/{logger_name}",
    use_queues=True,
    send_interval=30,
    max_batch_size=512 * 1024,
    boto3_client=boto3.client("logs", region_name="us-west-2"),
    create_log_group=True,
    log_group_tags={"Environment": "production", "Service": "web-api"}
)

logger = logging.getLogger("myapp")
logger.addHandler(handler)
logger.info("Application started")

Architecture

Watchtower uses a thread-based queuing system for efficient log delivery:

  • CloudWatchLogHandler: Main handler that implements Python's logging.Handler interface
  • Message Queuing: Thread-safe queue system that batches messages for efficient delivery
  • Batch Processing: Automatic batching based on size, count, and time intervals
  • AWS Integration: Direct boto3 CloudWatch Logs API integration with retry logic
  • Stream Management: Automatic log group and stream creation with naming templates

Capabilities

CloudWatch Log Handler

The main handler class that integrates with Python's logging system to send logs to AWS CloudWatch Logs. Provides comprehensive configuration options for log grouping, streaming, batching, and AWS authentication.

Class Constants:

END: int = 1  # Signal for thread termination
FLUSH: int = 2  # Signal for immediate flush
FLUSH_TIMEOUT: int = 30  # Timeout for flush operations
EXTRA_MSG_PAYLOAD_SIZE: int = 26  # Extra metadata size per message
class CloudWatchLogHandler(logging.Handler):
    def __init__(
        self,
        log_group_name: str = __name__,
        log_stream_name: str = "{machine_name}/{program_name}/{logger_name}/{process_id}",
        use_queues: bool = True,
        send_interval: int = 60,
        max_batch_size: int = 1024 * 1024,
        max_batch_count: int = 10000,
        boto3_client: Optional[botocore.client.BaseClient] = None,
        boto3_profile_name: Optional[str] = None,
        create_log_group: bool = True,
        log_group_tags: Dict[str, str] = {},
        json_serialize_default: Optional[Callable] = None,
        log_group_retention_days: Optional[int] = None,
        create_log_stream: bool = True,
        max_message_size: int = 256 * 1024,
        log_group: Optional[str] = None,  # deprecated
        stream_name: Optional[str] = None,  # deprecated
        *args,
        **kwargs
    ):
        """
        Create a CloudWatch log handler.

        Parameters:
        - log_group_name: CloudWatch log group name
        - log_stream_name: Log stream name template with format placeholders
        - use_queues: Enable message queuing and batching (recommended: True)
        - send_interval: Maximum seconds to hold messages before sending
        - max_batch_size: Maximum batch size in bytes (AWS limit: 1,048,576)
        - max_batch_count: Maximum messages per batch (AWS limit: 10,000)
        - boto3_client: Custom boto3 logs client (botocore.client.BaseClient) for authentication/region
        - boto3_profile_name: AWS profile name for authentication
        - create_log_group: Auto-create log group if it doesn't exist
        - log_group_tags: Dictionary of tags to apply to log group
        - json_serialize_default: Custom JSON serialization function
        - log_group_retention_days: Log retention period in days
        - create_log_stream: Auto-create log stream if it doesn't exist
        - max_message_size: Maximum message size in bytes (default: 256KB)
        - log_group: (deprecated) Use log_group_name instead
        - stream_name: (deprecated) Use log_stream_name instead
        """

    def emit(self, record: logging.LogRecord) -> None:
        """Send a log record to CloudWatch Logs."""

    def flush(self) -> None:
        """Send any queued messages to CloudWatch immediately."""

    def close(self) -> None:
        """Send queued messages and prevent further processing."""

    def __repr__(self) -> str:
        """Return string representation of the handler."""

CloudWatch Log Formatter

Specialized formatter that handles JSON serialization for CloudWatch Logs, enabling structured logging with automatic recognition and indexing by CloudWatch.

class CloudWatchLogFormatter(logging.Formatter):
    add_log_record_attrs: Tuple[str, ...] = tuple()
    
    def __init__(
        self,
        *args,
        json_serialize_default: Optional[Callable] = None,
        add_log_record_attrs: Optional[Tuple[str, ...]] = None,
        **kwargs
    ):
        """
        Create a CloudWatch log formatter.

        Parameters:
        - json_serialize_default: Custom JSON serialization function for objects
        - add_log_record_attrs: Tuple of LogRecord attributes to include in messages
        """

    def format(self, record: logging.LogRecord) -> str:
        """
        Format log record for CloudWatch, handling JSON serialization.

        Parameters:
        - record: LogRecord instance to format

        Returns:
        str: Formatted log message, JSON string for dict messages
        """

Exceptions and Warnings

Custom exception and warning classes for watchtower-specific error handling.

class WatchtowerError(Exception):
    """Default exception class for watchtower module errors."""

class WatchtowerWarning(UserWarning):
    """Default warning class for watchtower module warnings."""

Constants and Utility Functions

DEFAULT_LOG_STREAM_NAME: str = "{machine_name}/{program_name}/{logger_name}/{process_id}"

def _json_serialize_default(o: Any) -> str:
    """
    Standard JSON serializer function for CloudWatch log messages.
    
    Serializes datetime objects using .isoformat() method,
    and all other objects using repr().
    
    Parameters:
    - o: Object to serialize
    
    Returns:
    str: Serialized representation
    """

Default log stream name template with format placeholders for:

  • {machine_name}: Platform hostname
  • {program_name}: Program name from sys.argv[0]
  • {logger_name}: Logger name
  • {process_id}: Process ID
  • {thread_name}: Thread name
  • {strftime:%format}: UTC datetime formatting

Usage Examples

Flask Integration

import watchtower
import flask
import logging

logging.basicConfig(level=logging.INFO)
app = flask.Flask("myapp")
handler = watchtower.CloudWatchLogHandler(log_group_name=app.name)
app.logger.addHandler(handler)
logging.getLogger("werkzeug").addHandler(handler)

@app.route('/')
def hello():
    app.logger.info("Request received", extra={"user_agent": request.headers.get("User-Agent")})
    return 'Hello World!'

Django Integration

# In settings.py
import boto3

LOGGING = {
    'version': 1,
    'disable_existing_loggers': False,
    'handlers': {
        'watchtower': {
            'class': 'watchtower.CloudWatchLogHandler',
            'log_group_name': 'django-app',
            'boto3_client': boto3.client('logs', region_name='us-east-1'),
            'create_log_group': True,
        },
    },
    'loggers': {
        'django': {
            'handlers': ['watchtower'],
            'level': 'INFO',
        },
    },
}

Structured Logging with Metadata

import watchtower
import logging
from datetime import datetime

# Configure handler with LogRecord attributes
handler = watchtower.CloudWatchLogHandler()
handler.formatter.add_log_record_attrs = ["levelname", "filename", "process", "thread"]

logger = logging.getLogger("myapp")
logger.addHandler(handler)

# Log structured data
logger.critical({
    "event": "user_login",
    "user_id": "12345",
    "timestamp": datetime.utcnow(),
    "metadata": {
        "ip_address": "192.168.1.1",
        "user_agent": "Mozilla/5.0...",
        "session_id": "abc123"
    }
})

Custom JSON Serialization

import watchtower
import logging
from datetime import datetime
from decimal import Decimal

def custom_serializer(obj):
    """Custom JSON serializer for special types."""
    if isinstance(obj, Decimal):
        return float(obj)
    elif isinstance(obj, datetime):
        return obj.isoformat()
    return str(obj)

handler = watchtower.CloudWatchLogHandler(
    json_serialize_default=custom_serializer
)

logger = logging.getLogger("myapp")
logger.addHandler(handler)

# Log with custom types
logger.info({
    "price": Decimal("19.99"),
    "timestamp": datetime.utcnow(),
    "status": "success"
})

Manual Flush and Close

import watchtower
import logging

handler = watchtower.CloudWatchLogHandler()
logger = logging.getLogger("myapp")
logger.addHandler(handler)

try:
    logger.info("Starting process")
    # ... application logic ...
    logger.info("Process completed")
finally:
    # Ensure all logs are sent before exit
    handler.flush()  # Send queued messages
    handler.close()  # Send final messages and shutdown

Error Handling

Common Warnings

# Empty message warning
logger.info("")  # Triggers WatchtowerWarning

# Message after shutdown warning
handler.close()
logger.info("This triggers a warning")  # WatchtowerWarning

# Oversized message warning
logger.info("x" * (300 * 1024))  # Message truncated, WatchtowerWarning

AWS Authentication Errors

import watchtower
import boto3
from botocore.exceptions import NoCredentialsError, ClientError

try:
    handler = watchtower.CloudWatchLogHandler(
        boto3_client=boto3.client("logs", region_name="us-west-2")
    )
except NoCredentialsError:
    print("AWS credentials not configured")
except ClientError as e:
    print(f"AWS API error: {e}")

Configuration Validation

import watchtower

try:
    # This raises WatchtowerError
    handler = watchtower.CloudWatchLogHandler(
        boto3_client=boto3.client("logs"),
        boto3_profile_name="myprofile"  # Can't specify both
    )
except watchtower.WatchtowerError as e:
    print(f"Configuration error: {e}")