CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-fluent-logger

A Python logging handler for Fluentd event collector

Pending
Overview
Eval results
Files

logging-integration.mddocs/

Python Logging Integration

Full logging.Handler compatibility with structured formatting, custom formatters, and seamless integration with existing Python logging workflows. This enables Fluentd integration without changing existing logging code.

Capabilities

FluentHandler Class

Python logging.Handler subclass that sends log records to Fluentd with structured formatting and full integration with the standard logging module.

class FluentHandler(logging.Handler):
    def __init__(
        self,
        tag: str,
        host: str = "localhost",
        port: int = 24224,
        timeout: float = 3.0,
        verbose: bool = False,
        buffer_overflow_handler = None,
        msgpack_kwargs = None,
        nanosecond_precision: bool = False,
        **kwargs
    ):
        """
        Initialize FluentHandler.

        Parameters:
        - tag (str): Tag for log events
        - host (str): Fluentd host
        - port (int): Fluentd port
        - timeout (float): Connection timeout
        - verbose (bool): Verbose mode
        - buffer_overflow_handler (callable): Buffer overflow handler
        - msgpack_kwargs (dict): msgpack options
        - nanosecond_precision (bool): Use nanosecond timestamps
        - **kwargs: Additional FluentSender options
        """

    def emit(self, record) -> bool:
        """
        Emit a log record to Fluentd.

        Parameters:
        - record (LogRecord): Python logging.LogRecord instance

        Returns:
        bool: Success status from underlying sender
        """

    def close(self) -> None:
        """Close handler and underlying sender."""

    def getSenderClass(self):
        """
        Get the sender class to use.

        Returns:
        class: FluentSender class
        """

    def getSenderInstance(
        self,
        tag: str,
        host: str,
        port: int,
        timeout: float,
        verbose: bool,
        buffer_overflow_handler,
        msgpack_kwargs,
        nanosecond_precision: bool,
        **kwargs
    ):
        """
        Create sender instance.

        Returns:
        FluentSender: Configured sender instance
        """

    @property
    def sender(self):
        """
        Get sender instance (lazy initialization).

        Returns:
        FluentSender: The underlying sender instance
        """

    def __enter__(self):
        """Enter context manager."""

    def __exit__(self, exc_type, exc_val, exc_tb):
        """Exit context manager, closes handler."""

FluentRecordFormatter Class

Structured formatter for converting Python log records into Fluentd-compatible structured data with flexible formatting options.

class FluentRecordFormatter(logging.Formatter):
    def __init__(
        self,
        fmt = None,
        datefmt = None,
        style: str = "%",
        fill_missing_fmt_key: bool = False,
        format_json: bool = True,
        exclude_attrs = None
    ):
        """
        Initialize FluentRecordFormatter.

        Parameters:
        - fmt (dict/callable): Format specification dict or callable
        - datefmt (str): Date format string
        - style (str): Format style ('%', '{', or '$')
        - fill_missing_fmt_key (bool): Fill missing format keys with None
        - format_json (bool): Parse message as JSON if possible
        - exclude_attrs (iterable): Attributes to exclude from record
        """

    def format(self, record) -> dict:
        """
        Format log record as structured data.

        Parameters:
        - record (LogRecord): Python logging.LogRecord

        Returns:
        dict: Structured log data for Fluentd
        """

    def usesTime(self) -> bool:
        """
        Check if formatter uses time.

        Returns:
        bool: True if formatter uses asctime
        """

Usage Examples

Basic Logging Handler

import logging
from fluent import handler

# Configure logging with FluentHandler
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger('myapp')

# Add Fluent handler
fluent_handler = handler.FluentHandler('app.logs')
logger.addHandler(fluent_handler)

# Use standard logging - automatically sent to Fluentd
logger.info('Application started')
logger.warning('High memory usage detected')
logger.error('Database connection failed')

# Cleanup
fluent_handler.close()

Structured Logging with Custom Formatter

import logging
from fluent import handler

# Custom format for structured logs
custom_format = {
    'host': '%(hostname)s',
    'where': '%(module)s.%(funcName)s',
    'level': '%(levelname)s',
    'stack_trace': '%(exc_text)s'
}

# Setup logger with custom formatter
logger = logging.getLogger('structured_app')
logger.setLevel(logging.DEBUG)

fluent_handler = handler.FluentHandler('app.structured', host='logs.company.com')
formatter = handler.FluentRecordFormatter(custom_format)
fluent_handler.setFormatter(formatter)
logger.addHandler(fluent_handler)

# Structured logging
logger.info('User action performed', extra={
    'user_id': 123,
    'action': 'login',
    'ip_address': '192.168.1.1'
})

logger.error('Payment processing failed', extra={
    'order_id': 'ord-456',
    'error_code': 'CARD_DECLINED',
    'amount': 99.99
})

fluent_handler.close()

JSON Message Parsing

import logging
import json
from fluent import handler

logger = logging.getLogger('json_app')
fluent_handler = handler.FluentHandler('app.json')

# Formatter will automatically parse JSON strings
formatter = handler.FluentRecordFormatter(format_json=True)
fluent_handler.setFormatter(formatter)
logger.addHandler(fluent_handler)

# Send JSON string - automatically parsed into structured data
json_data = json.dumps({
    'event': 'user_signup',
    'user_id': 789,
    'email': 'user@example.com',
    'source': 'web'
})

logger.info(json_data)

# Send dictionary directly
logger.info({
    'event': 'purchase',
    'user_id': 789,
    'product_id': 'prod-123',
    'amount': 29.99
})

# Send plain string - becomes 'message' field
logger.info('This will be a message field')

fluent_handler.close()

Configuration via logging.config

import logging.config

# YAML/dict configuration
config = {
    'version': 1,
    'formatters': {
        'fluent_fmt': {
            '()': 'fluent.handler.FluentRecordFormatter',
            'format': {
                'level': '%(levelname)s',
                'hostname': '%(hostname)s',
                'where': '%(module)s.%(funcName)s',
                'stack_trace': '%(exc_text)s'
            }
        }
    },
    'handlers': {
        'fluent': {
            'class': 'fluent.handler.FluentHandler',
            'tag': 'myapp.logs',
            'host': 'localhost',
            'port': 24224,
            'formatter': 'fluent_fmt',
            'level': 'INFO'
        },
        'console': {
            'class': 'logging.StreamHandler',
            'level': 'DEBUG',
            'formatter': 'fluent_fmt'
        }
    },
    'loggers': {
        'myapp': {
            'handlers': ['fluent', 'console'],
            'level': 'DEBUG',
            'propagate': False
        }
    }
}

logging.config.dictConfig(config)

# Use configured logger
logger = logging.getLogger('myapp')
logger.info('Application configured via dictConfig')
logger.error('Error with stack trace', exc_info=True)

Buffer Overflow Handling

import logging
import msgpack
from io import BytesIO
from fluent import handler

def log_overflow_handler(pendings):
    """Handle log buffer overflow"""
    print(f"Log buffer overflow: {len(pendings)} bytes")
    
    # Save to local file as backup
    with open('/tmp/fluent_overflow.log', 'ab') as f:
        f.write(pendings)
    
    # Parse and print lost log entries
    unpacker = msgpack.Unpacker(BytesIO(pendings))
    for log_entry in unpacker:
        print(f"Lost log: {log_entry}")

# Setup handler with overflow protection
logger = logging.getLogger('overflow_test')
fluent_handler = handler.FluentHandler(
    'app.logs',
    host='unreliable-server.example.com',
    buffer_overflow_handler=log_overflow_handler,
    bufmax=1024  # Small buffer for testing
)

logger.addHandler(fluent_handler)

# Generate lots of logs to test overflow
for i in range(1000):
    logger.info(f'Log message {i}', extra={'data': 'x' * 100})

fluent_handler.close()

Context Manager Usage

import logging
from fluent import handler

logger = logging.getLogger('context_app')

# Automatic cleanup with context manager
with handler.FluentHandler('app.session') as fluent_handler:
    logger.addHandler(fluent_handler)
    
    logger.info('Session started')
    logger.info('Processing user request')
    logger.info('Session ended')
    
    # Handler automatically closed on exit

Exception Logging

import logging
from fluent import handler

logger = logging.getLogger('error_app')
fluent_handler = handler.FluentHandler('app.errors')

# Custom formatter to include stack traces
formatter = handler.FluentRecordFormatter({
    'level': '%(levelname)s',
    'message': '%(message)s',
    'module': '%(module)s',
    'function': '%(funcName)s',
    'exception': '%(exc_text)s'
})

fluent_handler.setFormatter(formatter)
logger.addHandler(fluent_handler)

try:
    # Code that might raise an exception
    result = 10 / 0
except ZeroDivisionError as e:
    # Log with full exception details
    logger.error('Division by zero error', exc_info=True, extra={
        'operation': 'division',
        'numerator': 10,
        'denominator': 0
    })

fluent_handler.close()

Multi-Handler Setup

import logging
from fluent import handler

# Setup multiple handlers for different log levels
logger = logging.getLogger('multi_handler_app')
logger.setLevel(logging.DEBUG)

# Handler for all logs
all_logs_handler = handler.FluentHandler('app.all')
all_logs_handler.setLevel(logging.DEBUG)

# Handler for errors only
error_handler = handler.FluentHandler('app.errors', host='alerts.company.com')
error_handler.setLevel(logging.ERROR)

# Handler for performance metrics
perf_handler = handler.FluentHandler('app.performance')
perf_handler.setLevel(logging.INFO)
perf_filter = lambda record: 'performance' in record.getMessage().lower()
perf_handler.addFilter(perf_filter)

logger.addHandler(all_logs_handler)
logger.addHandler(error_handler)
logger.addHandler(perf_handler)

# Different log levels go to appropriate handlers
logger.debug('Debug information')  # Only to all_logs_handler
logger.info('Performance metric: response time 150ms')  # To all_logs and perf handlers
logger.error('Critical system error')  # To all handlers

# Cleanup
all_logs_handler.close()
error_handler.close()
perf_handler.close()

Install with Tessl CLI

npx tessl i tessl/pypi-fluent-logger

docs

async-communication.md

core-sender.md

event-interface.md

index.md

logging-integration.md

tile.json