A Python logging handler for Fluentd event collector
—
Full logging.Handler compatibility with structured formatting, custom formatters, and seamless integration with existing Python logging workflows. This enables Fluentd integration without changing existing logging code.
Python logging.Handler subclass that sends log records to Fluentd with structured formatting and full integration with the standard logging module.
class FluentHandler(logging.Handler):
def __init__(
self,
tag: str,
host: str = "localhost",
port: int = 24224,
timeout: float = 3.0,
verbose: bool = False,
buffer_overflow_handler = None,
msgpack_kwargs = None,
nanosecond_precision: bool = False,
**kwargs
):
"""
Initialize FluentHandler.
Parameters:
- tag (str): Tag for log events
- host (str): Fluentd host
- port (int): Fluentd port
- timeout (float): Connection timeout
- verbose (bool): Verbose mode
- buffer_overflow_handler (callable): Buffer overflow handler
- msgpack_kwargs (dict): msgpack options
- nanosecond_precision (bool): Use nanosecond timestamps
- **kwargs: Additional FluentSender options
"""
def emit(self, record) -> bool:
"""
Emit a log record to Fluentd.
Parameters:
- record (LogRecord): Python logging.LogRecord instance
Returns:
bool: Success status from underlying sender
"""
def close(self) -> None:
"""Close handler and underlying sender."""
def getSenderClass(self):
"""
Get the sender class to use.
Returns:
class: FluentSender class
"""
def getSenderInstance(
self,
tag: str,
host: str,
port: int,
timeout: float,
verbose: bool,
buffer_overflow_handler,
msgpack_kwargs,
nanosecond_precision: bool,
**kwargs
):
"""
Create sender instance.
Returns:
FluentSender: Configured sender instance
"""
@property
def sender(self):
"""
Get sender instance (lazy initialization).
Returns:
FluentSender: The underlying sender instance
"""
def __enter__(self):
"""Enter context manager."""
def __exit__(self, exc_type, exc_val, exc_tb):
"""Exit context manager, closes handler."""Structured formatter for converting Python log records into Fluentd-compatible structured data with flexible formatting options.
class FluentRecordFormatter(logging.Formatter):
def __init__(
self,
fmt = None,
datefmt = None,
style: str = "%",
fill_missing_fmt_key: bool = False,
format_json: bool = True,
exclude_attrs = None
):
"""
Initialize FluentRecordFormatter.
Parameters:
- fmt (dict/callable): Format specification dict or callable
- datefmt (str): Date format string
- style (str): Format style ('%', '{', or '$')
- fill_missing_fmt_key (bool): Fill missing format keys with None
- format_json (bool): Parse message as JSON if possible
- exclude_attrs (iterable): Attributes to exclude from record
"""
def format(self, record) -> dict:
"""
Format log record as structured data.
Parameters:
- record (LogRecord): Python logging.LogRecord
Returns:
dict: Structured log data for Fluentd
"""
def usesTime(self) -> bool:
"""
Check if formatter uses time.
Returns:
bool: True if formatter uses asctime
"""import logging
from fluent import handler
# Configure logging with FluentHandler
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger('myapp')
# Add Fluent handler
fluent_handler = handler.FluentHandler('app.logs')
logger.addHandler(fluent_handler)
# Use standard logging - automatically sent to Fluentd
logger.info('Application started')
logger.warning('High memory usage detected')
logger.error('Database connection failed')
# Cleanup
fluent_handler.close()import logging
from fluent import handler
# Custom format for structured logs
custom_format = {
'host': '%(hostname)s',
'where': '%(module)s.%(funcName)s',
'level': '%(levelname)s',
'stack_trace': '%(exc_text)s'
}
# Setup logger with custom formatter
logger = logging.getLogger('structured_app')
logger.setLevel(logging.DEBUG)
fluent_handler = handler.FluentHandler('app.structured', host='logs.company.com')
formatter = handler.FluentRecordFormatter(custom_format)
fluent_handler.setFormatter(formatter)
logger.addHandler(fluent_handler)
# Structured logging
logger.info('User action performed', extra={
'user_id': 123,
'action': 'login',
'ip_address': '192.168.1.1'
})
logger.error('Payment processing failed', extra={
'order_id': 'ord-456',
'error_code': 'CARD_DECLINED',
'amount': 99.99
})
fluent_handler.close()import logging
import json
from fluent import handler
logger = logging.getLogger('json_app')
fluent_handler = handler.FluentHandler('app.json')
# Formatter will automatically parse JSON strings
formatter = handler.FluentRecordFormatter(format_json=True)
fluent_handler.setFormatter(formatter)
logger.addHandler(fluent_handler)
# Send JSON string - automatically parsed into structured data
json_data = json.dumps({
'event': 'user_signup',
'user_id': 789,
'email': 'user@example.com',
'source': 'web'
})
logger.info(json_data)
# Send dictionary directly
logger.info({
'event': 'purchase',
'user_id': 789,
'product_id': 'prod-123',
'amount': 29.99
})
# Send plain string - becomes 'message' field
logger.info('This will be a message field')
fluent_handler.close()import logging.config
# YAML/dict configuration
config = {
'version': 1,
'formatters': {
'fluent_fmt': {
'()': 'fluent.handler.FluentRecordFormatter',
'format': {
'level': '%(levelname)s',
'hostname': '%(hostname)s',
'where': '%(module)s.%(funcName)s',
'stack_trace': '%(exc_text)s'
}
}
},
'handlers': {
'fluent': {
'class': 'fluent.handler.FluentHandler',
'tag': 'myapp.logs',
'host': 'localhost',
'port': 24224,
'formatter': 'fluent_fmt',
'level': 'INFO'
},
'console': {
'class': 'logging.StreamHandler',
'level': 'DEBUG',
'formatter': 'fluent_fmt'
}
},
'loggers': {
'myapp': {
'handlers': ['fluent', 'console'],
'level': 'DEBUG',
'propagate': False
}
}
}
logging.config.dictConfig(config)
# Use configured logger
logger = logging.getLogger('myapp')
logger.info('Application configured via dictConfig')
logger.error('Error with stack trace', exc_info=True)import logging
import msgpack
from io import BytesIO
from fluent import handler
def log_overflow_handler(pendings):
"""Handle log buffer overflow"""
print(f"Log buffer overflow: {len(pendings)} bytes")
# Save to local file as backup
with open('/tmp/fluent_overflow.log', 'ab') as f:
f.write(pendings)
# Parse and print lost log entries
unpacker = msgpack.Unpacker(BytesIO(pendings))
for log_entry in unpacker:
print(f"Lost log: {log_entry}")
# Setup handler with overflow protection
logger = logging.getLogger('overflow_test')
fluent_handler = handler.FluentHandler(
'app.logs',
host='unreliable-server.example.com',
buffer_overflow_handler=log_overflow_handler,
bufmax=1024 # Small buffer for testing
)
logger.addHandler(fluent_handler)
# Generate lots of logs to test overflow
for i in range(1000):
logger.info(f'Log message {i}', extra={'data': 'x' * 100})
fluent_handler.close()import logging
from fluent import handler
logger = logging.getLogger('context_app')
# Automatic cleanup with context manager
with handler.FluentHandler('app.session') as fluent_handler:
logger.addHandler(fluent_handler)
logger.info('Session started')
logger.info('Processing user request')
logger.info('Session ended')
# Handler automatically closed on exitimport logging
from fluent import handler
logger = logging.getLogger('error_app')
fluent_handler = handler.FluentHandler('app.errors')
# Custom formatter to include stack traces
formatter = handler.FluentRecordFormatter({
'level': '%(levelname)s',
'message': '%(message)s',
'module': '%(module)s',
'function': '%(funcName)s',
'exception': '%(exc_text)s'
})
fluent_handler.setFormatter(formatter)
logger.addHandler(fluent_handler)
try:
# Code that might raise an exception
result = 10 / 0
except ZeroDivisionError as e:
# Log with full exception details
logger.error('Division by zero error', exc_info=True, extra={
'operation': 'division',
'numerator': 10,
'denominator': 0
})
fluent_handler.close()import logging
from fluent import handler
# Setup multiple handlers for different log levels
logger = logging.getLogger('multi_handler_app')
logger.setLevel(logging.DEBUG)
# Handler for all logs
all_logs_handler = handler.FluentHandler('app.all')
all_logs_handler.setLevel(logging.DEBUG)
# Handler for errors only
error_handler = handler.FluentHandler('app.errors', host='alerts.company.com')
error_handler.setLevel(logging.ERROR)
# Handler for performance metrics
perf_handler = handler.FluentHandler('app.performance')
perf_handler.setLevel(logging.INFO)
perf_filter = lambda record: 'performance' in record.getMessage().lower()
perf_handler.addFilter(perf_filter)
logger.addHandler(all_logs_handler)
logger.addHandler(error_handler)
logger.addHandler(perf_handler)
# Different log levels go to appropriate handlers
logger.debug('Debug information') # Only to all_logs_handler
logger.info('Performance metric: response time 150ms') # To all_logs and perf handlers
logger.error('Critical system error') # To all handlers
# Cleanup
all_logs_handler.close()
error_handler.close()
perf_handler.close()Install with Tessl CLI
npx tessl i tessl/pypi-fluent-logger