A Python logging handler for Fluentd event collector
—
Direct FluentSender interface for programmatic event emission with full control over connection parameters, error handling, and message formatting. This provides the lowest-level interface to Fluentd with maximum flexibility.
Main synchronous sender class that manages connections to Fluentd servers and handles event transmission with buffering and error recovery.
class FluentSender:
def __init__(
self,
tag: str,
host: str = "localhost",
port: int = 24224,
bufmax: int = 1048576,
timeout: float = 3.0,
verbose: bool = False,
buffer_overflow_handler = None,
nanosecond_precision: bool = False,
msgpack_kwargs = None,
*,
forward_packet_error: bool = True,
**kwargs
):
"""
Initialize FluentSender.
Parameters:
- tag (str): Tag prefix for events
- host (str): Fluentd host, supports "unix://" URLs for Unix sockets
- port (int): Fluentd port for TCP connections
- bufmax (int): Maximum buffer size in bytes (default 1MB)
- timeout (float): Connection timeout in seconds
- verbose (bool): Enable verbose logging of packets
- buffer_overflow_handler (callable): Handler for buffer overflow events
- nanosecond_precision (bool): Use nanosecond-precision timestamps
- msgpack_kwargs (dict): Additional msgpack serialization options
- forward_packet_error (bool): Forward packet errors as events
"""
def emit(self, label: str, data: dict) -> bool:
"""
Emit event with current timestamp.
Parameters:
- label (str): Event label (combined with tag as 'tag.label')
- data (dict): Event data dictionary
Returns:
bool: True if successful, False if error occurred
"""
def emit_with_time(self, label: str, timestamp, data: dict) -> bool:
"""
Emit event with specific timestamp.
Parameters:
- label (str): Event label
- timestamp: Unix timestamp (int/float) or EventTime instance
- data (dict): Event data dictionary
Returns:
bool: True if successful, False if error occurred
"""
def close(self) -> None:
"""
Close sender and flush any pending events.
Calls buffer_overflow_handler for any remaining pending events.
"""
def clear_last_error(self, _thread_id=None) -> None:
"""
Clear the last error from thread-local storage.
Parameters:
- _thread_id: Internal parameter for thread identification
"""
@property
def last_error(self):
"""
Get the last error that occurred (thread-local).
Returns:
Exception or None: Last error for current thread
"""
@last_error.setter
def last_error(self, err):
"""Set the last error for current thread."""
def __enter__(self):
"""Enter context manager."""
def __exit__(self, typ, value, traceback):
"""Exit context manager, closes sender."""Internal functions available for testing purposes:
def _set_global_sender(sender):
"""
[For testing] Set global sender directly.
Parameters:
- sender (FluentSender): Sender instance to use as global sender
"""Module-level functions for managing a global FluentSender instance, providing a singleton pattern for application-wide logging.
def setup(tag: str, **kwargs) -> None:
"""
Initialize global FluentSender instance.
Parameters:
- tag (str): Tag prefix for events
- **kwargs: Additional FluentSender constructor arguments
"""
def get_global_sender():
"""
Get the global FluentSender instance.
Returns:
FluentSender or None: Global sender instance
"""
def close() -> None:
"""Close the global FluentSender instance."""Specialized timestamp class for nanosecond-precision logging, implemented as msgpack ExtType for efficient serialization.
class EventTime:
def __new__(cls, timestamp: float, nanoseconds: int = None):
"""
Create EventTime instance.
Parameters:
- timestamp (float): Unix timestamp in seconds
- nanoseconds (int, optional): Nanosecond component, calculated from timestamp if not provided
Returns:
EventTime: New EventTime instance (msgpack ExtType)
"""
@classmethod
def from_unix_nano(cls, unix_nano: int):
"""
Create EventTime from nanosecond timestamp.
Parameters:
- unix_nano (int): Unix timestamp in nanoseconds
Returns:
EventTime: New EventTime instance
"""from fluent import sender
# Create sender for local Fluentd
logger = sender.FluentSender('app')
# Send events
logger.emit('user.login', {'user_id': 123, 'ip': '192.168.1.1'})
logger.emit('user.action', {'user_id': 123, 'action': 'click', 'target': 'button'})
logger.close()from fluent import sender
# Connect to remote Fluentd server
logger = sender.FluentSender('app', host='fluentd.example.com', port=24224)
# Send event
result = logger.emit('purchase', {
'user_id': 456,
'product_id': 'prod-123',
'amount': 29.99,
'currency': 'USD'
})
if not result:
print(f"Failed to send event: {logger.last_error}")
logger.clear_last_error()
logger.close()from fluent import sender
# Connect via Unix socket
logger = sender.FluentSender('app', host='unix:///var/run/fluentd.sock')
logger.emit('system.alert', {'level': 'warning', 'message': 'Disk usage high'})
logger.close()import time
from fluent import sender
# Enable nanosecond precision
logger = sender.FluentSender('app', nanosecond_precision=True)
# Automatic nanosecond timestamp
logger.emit('timing.event', {'operation': 'database_query', 'duration_ms': 150})
# Manual timestamp with nanosecond precision
timestamp = time.time()
logger.emit_with_time('timing.precise', timestamp, {'value': 42})
# Using EventTime directly
event_time = sender.EventTime.from_unix_nano(time.time_ns())
logger.emit_with_time('timing.nano', event_time, {'precision': 'nanosecond'})
logger.close()import msgpack
from io import BytesIO
from fluent import sender
def handle_overflow(pendings):
"""Custom handler for buffer overflow"""
print(f"Buffer overflow! {len(pendings)} bytes pending")
# Parse pending events
unpacker = msgpack.Unpacker(BytesIO(pendings))
for event in unpacker:
print(f"Lost event: {event}")
# Create sender with overflow handler
logger = sender.FluentSender(
'app',
host='unreliable-host.example.com',
bufmax=1024, # Small buffer for demonstration
buffer_overflow_handler=handle_overflow
)
# Send events (some may trigger overflow if connection fails)
for i in range(100):
logger.emit('test', {'index': i, 'data': 'x' * 100})
logger.close()from fluent import sender
# Automatic cleanup with context manager
with sender.FluentSender('app') as logger:
logger.emit('session.start', {'user_id': 789})
logger.emit('session.action', {'action': 'view_page', 'page': '/home'})
logger.emit('session.end', {'duration': 120})
# Sender is automatically closedfrom fluent import sender
# Setup global sender at application start
sender.setup('myapp', host='logs.company.com', port=24224)
# Use global sender anywhere in application
def process_user_action(user_id, action):
global_sender = sender.get_global_sender()
global_sender.emit('user.action', {
'user_id': user_id,
'action': action,
'timestamp': time.time()
})
# Application shutdown
def shutdown():
sender.close()Install with Tessl CLI
npx tessl i tessl/pypi-fluent-logger